diff --git a/spring-context/src/main/java/org/springframework/scheduling/annotation/AsyncResult.java b/spring-context/src/main/java/org/springframework/scheduling/annotation/AsyncResult.java index ae6ecc4271b..48edaf62dae 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/annotation/AsyncResult.java +++ b/spring-context/src/main/java/org/springframework/scheduling/annotation/AsyncResult.java @@ -37,6 +37,7 @@ import org.springframework.util.concurrent.SuccessCallback; * to the caller. * * @author Juergen Hoeller + * @author Rossen Stoyanchev * @since 3.0 * @see Async * @see #forValue(Object) @@ -103,10 +104,16 @@ public class AsyncResult implements ListenableFuture { @Override public void addCallback(SuccessCallback successCallback, FailureCallback failureCallback) { try { - successCallback.onSuccess(this.value); + if (this.executionException != null) { + Throwable cause = this.executionException.getCause(); + failureCallback.onFailure(cause != null ? cause : this.executionException); + } + else { + successCallback.onSuccess(this.value); + } } catch (Throwable ex) { - failureCallback.onFailure(ex); + // Ignore } } diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/FailureCallback.java b/spring-core/src/main/java/org/springframework/util/concurrent/FailureCallback.java index caac2874b5b..da55327e830 100644 --- a/spring-core/src/main/java/org/springframework/util/concurrent/FailureCallback.java +++ b/spring-core/src/main/java/org/springframework/util/concurrent/FailureCallback.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,8 +17,7 @@ package org.springframework.util.concurrent; /** - * Defines the contract for failure callbacks that accept the result of a - * {@link ListenableFuture}. + * Failure callback for a {@link ListenableFuture}. * * @author Sebastien Deleuze * @since 4.1 @@ -26,8 +25,9 @@ package org.springframework.util.concurrent; public interface FailureCallback { /** - * Called when the {@link ListenableFuture} fails to complete. - * @param ex the exception that triggered the failure + * Called when the {@link ListenableFuture} completes with failure. + *

Note that Exceptions raised by this method are ignored. + * @param ex the failure */ void onFailure(Throwable ex); diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFuture.java b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFuture.java index 6250b08f583..cf84909df90 100644 --- a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFuture.java +++ b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFuture.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,11 +19,11 @@ package org.springframework.util.concurrent; import java.util.concurrent.Future; /** - * Extends the {@link Future} interface with the capability to accept completion - * callbacks. If the future has already completed when the callback is added, the - * callback will be triggered immediately. + * Extend {@link Future} with the capability to accept completion callbacks. + * If the future has completed when the callback is added, the callback is + * triggered immediately. *

Inspired by {@code com.google.common.util.concurrent.ListenableFuture}. - + * * @author Arjen Poutsma * @author Sebastien Deleuze * @since 4.0 @@ -31,20 +31,15 @@ import java.util.concurrent.Future; public interface ListenableFuture extends Future { /** - * Registers the given callback to this {@code ListenableFuture}. The callback will - * be triggered when this {@code Future} is complete or, if it is already complete, - * immediately. + * Register the given {@code ListenableFutureCallback}. * @param callback the callback to register */ void addCallback(ListenableFutureCallback callback); /** - * Registers the given success and failure callbacks to this {@code ListenableFuture}. - * The callback will be triggered when this {@code Future} is complete or, if it is - * already complete immediately. This is a Java 8 lambdas compliant alternative to - * {@link #addCallback(ListenableFutureCallback)}. - * @param successCallback the success callback to register - * @param failureCallback the failure callback to register + * Java 8 lambda-friendly alternative with success and failure callbacks. + * @param successCallback the success callback + * @param failureCallback the failure callback * @since 4.1 */ void addCallback(SuccessCallback successCallback, FailureCallback failureCallback); diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureAdapter.java b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureAdapter.java index 268a64964d6..8d091832647 100644 --- a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureAdapter.java +++ b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureAdapter.java @@ -52,20 +52,34 @@ public abstract class ListenableFutureAdapter extends FutureAdapter listenableAdaptee.addCallback(new ListenableFutureCallback() { @Override public void onSuccess(S result) { + T adapted; try { - successCallback.onSuccess(adaptInternal(result)); + adapted = adaptInternal(result); } catch (ExecutionException ex) { Throwable cause = ex.getCause(); onFailure(cause != null ? cause : ex); + return; } catch (Throwable ex) { onFailure(ex); + return; + } + try { + successCallback.onSuccess(adapted); + } + catch (Throwable e) { + // Ignore } } @Override public void onFailure(Throwable ex) { - failureCallback.onFailure(ex); + try { + failureCallback.onFailure(ex); + } + catch (Throwable t) { + // Ignore + } } }); } diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallback.java b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallback.java index 633794458cb..515ee4061cf 100644 --- a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallback.java +++ b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallback.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,7 @@ package org.springframework.util.concurrent; /** - * Defines the contract for callbacks that accept the result of a + * Callback mechanism for the outcome, success or failure, from a * {@link ListenableFuture}. * * @author Arjen Poutsma diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallbackRegistry.java b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallbackRegistry.java index 1c4be730da7..d9964598bdc 100644 --- a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallbackRegistry.java +++ b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallbackRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,12 +22,14 @@ import java.util.Queue; import org.springframework.util.Assert; /** - * Registry for {@link ListenableFutureCallback} instances. + * Helper class for {@link ListenableFuture} implementations that maintains a + * of success and failure callbacks and helps to notify them. * *

Inspired by {@code com.google.common.util.concurrent.ExecutionList}. * * @author Arjen Poutsma * @author Sebastien Deleuze + * @author Rossen Stoyanchev * @since 4.0 */ public class ListenableFutureCallbackRegistry { @@ -47,7 +49,6 @@ public class ListenableFutureCallbackRegistry { * Add the given callback to this registry. * @param callback the callback to add */ - @SuppressWarnings("unchecked") public void addCallback(ListenableFutureCallback callback) { Assert.notNull(callback, "'callback' must not be null"); synchronized (this.mutex) { @@ -57,15 +58,34 @@ public class ListenableFutureCallbackRegistry { this.failureCallbacks.add(callback); break; case SUCCESS: - callback.onSuccess((T) this.result); + notifySuccess(callback); break; case FAILURE: - callback.onFailure((Throwable) this.result); + notifyFailure(callback); break; } } } + @SuppressWarnings("unchecked") + private void notifySuccess(SuccessCallback callback) { + try { + callback.onSuccess((T) this.result); + } + catch (Throwable ex) { + // Ignore + } + } + + private void notifyFailure(FailureCallback callback) { + try { + callback.onFailure((Throwable) this.result); + } + catch (Throwable ex) { + // Ignore + } + } + /** * Add the given success callback to this registry. * @param callback the success callback to add @@ -80,7 +100,7 @@ public class ListenableFutureCallbackRegistry { this.successCallbacks.add(callback); break; case SUCCESS: - callback.onSuccess((T) this.result); + notifySuccess(callback); break; } } @@ -99,7 +119,7 @@ public class ListenableFutureCallbackRegistry { this.failureCallbacks.add(callback); break; case FAILURE: - callback.onFailure((Throwable) this.result); + notifyFailure(callback); break; } } @@ -115,7 +135,7 @@ public class ListenableFutureCallbackRegistry { this.state = State.SUCCESS; this.result = result; while (!this.successCallbacks.isEmpty()) { - this.successCallbacks.poll().onSuccess(result); + notifySuccess(this.successCallbacks.poll()); } } } @@ -130,7 +150,7 @@ public class ListenableFutureCallbackRegistry { this.state = State.FAILURE; this.result = ex; while (!this.failureCallbacks.isEmpty()) { - this.failureCallbacks.poll().onFailure(ex); + notifyFailure(this.failureCallbacks.poll()); } } } diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/SuccessCallback.java b/spring-core/src/main/java/org/springframework/util/concurrent/SuccessCallback.java index 65f3041162f..269f8044765 100644 --- a/spring-core/src/main/java/org/springframework/util/concurrent/SuccessCallback.java +++ b/spring-core/src/main/java/org/springframework/util/concurrent/SuccessCallback.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,8 +17,7 @@ package org.springframework.util.concurrent; /** - * Defines the contract for success callbacks that accept the result of a - * {@link ListenableFuture}. + * Success callback for a {@link ListenableFuture}. * * @author Sebastien Deleuze * @since 4.1 @@ -26,7 +25,8 @@ package org.springframework.util.concurrent; public interface SuccessCallback { /** - * Called when the {@link ListenableFuture} successfully completes. + * Called when the {@link ListenableFuture} completes with success. + *

Note that Exceptions raised by this method are ignored. * @param result the result */ void onSuccess(T result); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java index d2d6bc4ebf3..a96fba8feb1 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java @@ -53,12 +53,15 @@ abstract class AbstractPromiseToListenableFutureAdapter implements Listena this.promise.onSuccess(new Consumer() { @Override public void accept(S result) { + T adapted; try { - registry.success(adapt(result)); + adapted = adapt(result); } catch (Throwable ex) { registry.failure(ex); + return; } + registry.success(adapted); } });