Refine ListenableFutureCallback policy for exceptions
This change updates all cases where callbacks are invoked to catch and suppress errors (since there is not match to do with and error from a callback be it success or failure). Also updated is the contract itself to clarify this and emphasize the callbacks are really notifications for the outcome of the ListenableFuture not the callbacks themselves. Issue: SPR-13785
This commit is contained in:
parent
037f351efd
commit
3dae3fd8a9
|
|
@ -37,6 +37,7 @@ import org.springframework.util.concurrent.SuccessCallback;
|
|||
* to the caller.
|
||||
*
|
||||
* @author Juergen Hoeller
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 3.0
|
||||
* @see Async
|
||||
* @see #forValue(Object)
|
||||
|
|
@ -103,10 +104,16 @@ public class AsyncResult<V> implements ListenableFuture<V> {
|
|||
@Override
|
||||
public void addCallback(SuccessCallback<? super V> successCallback, FailureCallback failureCallback) {
|
||||
try {
|
||||
successCallback.onSuccess(this.value);
|
||||
if (this.executionException != null) {
|
||||
Throwable cause = this.executionException.getCause();
|
||||
failureCallback.onFailure(cause != null ? cause : this.executionException);
|
||||
}
|
||||
else {
|
||||
successCallback.onSuccess(this.value);
|
||||
}
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
failureCallback.onFailure(ex);
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2014 the original author or authors.
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -17,8 +17,7 @@
|
|||
package org.springframework.util.concurrent;
|
||||
|
||||
/**
|
||||
* Defines the contract for failure callbacks that accept the result of a
|
||||
* {@link ListenableFuture}.
|
||||
* Failure callback for a {@link ListenableFuture}.
|
||||
*
|
||||
* @author Sebastien Deleuze
|
||||
* @since 4.1
|
||||
|
|
@ -26,8 +25,9 @@ package org.springframework.util.concurrent;
|
|||
public interface FailureCallback {
|
||||
|
||||
/**
|
||||
* Called when the {@link ListenableFuture} fails to complete.
|
||||
* @param ex the exception that triggered the failure
|
||||
* Called when the {@link ListenableFuture} completes with failure.
|
||||
* <p>Note that Exceptions raised by this method are ignored.
|
||||
* @param ex the failure
|
||||
*/
|
||||
void onFailure(Throwable ex);
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2014 the original author or authors.
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -19,11 +19,11 @@ package org.springframework.util.concurrent;
|
|||
import java.util.concurrent.Future;
|
||||
|
||||
/**
|
||||
* Extends the {@link Future} interface with the capability to accept completion
|
||||
* callbacks. If the future has already completed when the callback is added, the
|
||||
* callback will be triggered immediately.
|
||||
* Extend {@link Future} with the capability to accept completion callbacks.
|
||||
* If the future has completed when the callback is added, the callback is
|
||||
* triggered immediately.
|
||||
* <p>Inspired by {@code com.google.common.util.concurrent.ListenableFuture}.
|
||||
|
||||
*
|
||||
* @author Arjen Poutsma
|
||||
* @author Sebastien Deleuze
|
||||
* @since 4.0
|
||||
|
|
@ -31,20 +31,15 @@ import java.util.concurrent.Future;
|
|||
public interface ListenableFuture<T> extends Future<T> {
|
||||
|
||||
/**
|
||||
* Registers the given callback to this {@code ListenableFuture}. The callback will
|
||||
* be triggered when this {@code Future} is complete or, if it is already complete,
|
||||
* immediately.
|
||||
* Register the given {@code ListenableFutureCallback}.
|
||||
* @param callback the callback to register
|
||||
*/
|
||||
void addCallback(ListenableFutureCallback<? super T> callback);
|
||||
|
||||
/**
|
||||
* Registers the given success and failure callbacks to this {@code ListenableFuture}.
|
||||
* The callback will be triggered when this {@code Future} is complete or, if it is
|
||||
* already complete immediately. This is a Java 8 lambdas compliant alternative to
|
||||
* {@link #addCallback(ListenableFutureCallback)}.
|
||||
* @param successCallback the success callback to register
|
||||
* @param failureCallback the failure callback to register
|
||||
* Java 8 lambda-friendly alternative with success and failure callbacks.
|
||||
* @param successCallback the success callback
|
||||
* @param failureCallback the failure callback
|
||||
* @since 4.1
|
||||
*/
|
||||
void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);
|
||||
|
|
|
|||
|
|
@ -52,20 +52,34 @@ public abstract class ListenableFutureAdapter<T, S> extends FutureAdapter<T, S>
|
|||
listenableAdaptee.addCallback(new ListenableFutureCallback<S>() {
|
||||
@Override
|
||||
public void onSuccess(S result) {
|
||||
T adapted;
|
||||
try {
|
||||
successCallback.onSuccess(adaptInternal(result));
|
||||
adapted = adaptInternal(result);
|
||||
}
|
||||
catch (ExecutionException ex) {
|
||||
Throwable cause = ex.getCause();
|
||||
onFailure(cause != null ? cause : ex);
|
||||
return;
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
onFailure(ex);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
successCallback.onSuccess(adapted);
|
||||
}
|
||||
catch (Throwable e) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public void onFailure(Throwable ex) {
|
||||
failureCallback.onFailure(ex);
|
||||
try {
|
||||
failureCallback.onFailure(ex);
|
||||
}
|
||||
catch (Throwable t) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2014 the original author or authors.
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -17,7 +17,7 @@
|
|||
package org.springframework.util.concurrent;
|
||||
|
||||
/**
|
||||
* Defines the contract for callbacks that accept the result of a
|
||||
* Callback mechanism for the outcome, success or failure, from a
|
||||
* {@link ListenableFuture}.
|
||||
*
|
||||
* @author Arjen Poutsma
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2014 the original author or authors.
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -22,12 +22,14 @@ import java.util.Queue;
|
|||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* Registry for {@link ListenableFutureCallback} instances.
|
||||
* Helper class for {@link ListenableFuture} implementations that maintains a
|
||||
* of success and failure callbacks and helps to notify them.
|
||||
*
|
||||
* <p>Inspired by {@code com.google.common.util.concurrent.ExecutionList}.
|
||||
*
|
||||
* @author Arjen Poutsma
|
||||
* @author Sebastien Deleuze
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 4.0
|
||||
*/
|
||||
public class ListenableFutureCallbackRegistry<T> {
|
||||
|
|
@ -47,7 +49,6 @@ public class ListenableFutureCallbackRegistry<T> {
|
|||
* Add the given callback to this registry.
|
||||
* @param callback the callback to add
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public void addCallback(ListenableFutureCallback<? super T> callback) {
|
||||
Assert.notNull(callback, "'callback' must not be null");
|
||||
synchronized (this.mutex) {
|
||||
|
|
@ -57,15 +58,34 @@ public class ListenableFutureCallbackRegistry<T> {
|
|||
this.failureCallbacks.add(callback);
|
||||
break;
|
||||
case SUCCESS:
|
||||
callback.onSuccess((T) this.result);
|
||||
notifySuccess(callback);
|
||||
break;
|
||||
case FAILURE:
|
||||
callback.onFailure((Throwable) this.result);
|
||||
notifyFailure(callback);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void notifySuccess(SuccessCallback<? super T> callback) {
|
||||
try {
|
||||
callback.onSuccess((T) this.result);
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
|
||||
private void notifyFailure(FailureCallback callback) {
|
||||
try {
|
||||
callback.onFailure((Throwable) this.result);
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the given success callback to this registry.
|
||||
* @param callback the success callback to add
|
||||
|
|
@ -80,7 +100,7 @@ public class ListenableFutureCallbackRegistry<T> {
|
|||
this.successCallbacks.add(callback);
|
||||
break;
|
||||
case SUCCESS:
|
||||
callback.onSuccess((T) this.result);
|
||||
notifySuccess(callback);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
@ -99,7 +119,7 @@ public class ListenableFutureCallbackRegistry<T> {
|
|||
this.failureCallbacks.add(callback);
|
||||
break;
|
||||
case FAILURE:
|
||||
callback.onFailure((Throwable) this.result);
|
||||
notifyFailure(callback);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
@ -115,7 +135,7 @@ public class ListenableFutureCallbackRegistry<T> {
|
|||
this.state = State.SUCCESS;
|
||||
this.result = result;
|
||||
while (!this.successCallbacks.isEmpty()) {
|
||||
this.successCallbacks.poll().onSuccess(result);
|
||||
notifySuccess(this.successCallbacks.poll());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -130,7 +150,7 @@ public class ListenableFutureCallbackRegistry<T> {
|
|||
this.state = State.FAILURE;
|
||||
this.result = ex;
|
||||
while (!this.failureCallbacks.isEmpty()) {
|
||||
this.failureCallbacks.poll().onFailure(ex);
|
||||
notifyFailure(this.failureCallbacks.poll());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2014 the original author or authors.
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -17,8 +17,7 @@
|
|||
package org.springframework.util.concurrent;
|
||||
|
||||
/**
|
||||
* Defines the contract for success callbacks that accept the result of a
|
||||
* {@link ListenableFuture}.
|
||||
* Success callback for a {@link ListenableFuture}.
|
||||
*
|
||||
* @author Sebastien Deleuze
|
||||
* @since 4.1
|
||||
|
|
@ -26,7 +25,8 @@ package org.springframework.util.concurrent;
|
|||
public interface SuccessCallback<T> {
|
||||
|
||||
/**
|
||||
* Called when the {@link ListenableFuture} successfully completes.
|
||||
* Called when the {@link ListenableFuture} completes with success.
|
||||
* <p>Note that Exceptions raised by this method are ignored.
|
||||
* @param result the result
|
||||
*/
|
||||
void onSuccess(T result);
|
||||
|
|
|
|||
|
|
@ -53,12 +53,15 @@ abstract class AbstractPromiseToListenableFutureAdapter<S, T> implements Listena
|
|||
this.promise.onSuccess(new Consumer<S>() {
|
||||
@Override
|
||||
public void accept(S result) {
|
||||
T adapted;
|
||||
try {
|
||||
registry.success(adapt(result));
|
||||
adapted = adapt(result);
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
registry.failure(ex);
|
||||
return;
|
||||
}
|
||||
registry.success(adapted);
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue