ListenableFuture provides CompletableFuture adaptation via completable()

Issue: SPR-15696
This commit is contained in:
Juergen Hoeller 2017-06-27 00:43:37 +02:00
parent 98642c7e29
commit 87430f3cd3
12 changed files with 284 additions and 51 deletions

View File

@ -150,7 +150,7 @@ public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport imple
* @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME
*/
@Override
protected Executor getDefaultExecutor(BeanFactory beanFactory) {
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,6 +16,7 @@
package org.springframework.scheduling.annotation;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -48,7 +49,7 @@ public class AsyncResult<V> implements ListenableFuture<V> {
private final V value;
private final ExecutionException executionException;
private final Throwable executionException;
/**
@ -63,7 +64,7 @@ public class AsyncResult<V> implements ListenableFuture<V> {
* Create a new AsyncResult holder.
* @param value the value to pass through
*/
private AsyncResult(@Nullable V value, @Nullable ExecutionException ex) {
private AsyncResult(@Nullable V value, @Nullable Throwable ex) {
this.value = value;
this.executionException = ex;
}
@ -87,7 +88,9 @@ public class AsyncResult<V> implements ListenableFuture<V> {
@Override
public V get() throws ExecutionException {
if (this.executionException != null) {
throw this.executionException;
throw (this.executionException instanceof ExecutionException ?
(ExecutionException) this.executionException :
new ExecutionException(this.executionException));
}
return this.value;
}
@ -106,8 +109,7 @@ public class AsyncResult<V> implements ListenableFuture<V> {
public void addCallback(SuccessCallback<? super V> successCallback, FailureCallback failureCallback) {
try {
if (this.executionException != null) {
Throwable cause = this.executionException.getCause();
failureCallback.onFailure(cause != null ? cause : this.executionException);
failureCallback.onFailure(exposedException(this.executionException));
}
else {
successCallback.onSuccess(this.value);
@ -118,6 +120,18 @@ public class AsyncResult<V> implements ListenableFuture<V> {
}
}
@Override
public CompletableFuture<V> completable() {
if (this.executionException != null) {
CompletableFuture<V> completable = new CompletableFuture<>();
completable.completeExceptionally(exposedException(this.executionException));
return completable;
}
else {
return CompletableFuture.completedFuture(this.value);
}
}
/**
* Create a new async result which exposes the given value from {@link Future#get()}.
@ -138,8 +152,23 @@ public class AsyncResult<V> implements ListenableFuture<V> {
* @see ExecutionException
*/
public static <V> ListenableFuture<V> forExecutionException(Throwable ex) {
return new AsyncResult<>(null,
(ex instanceof ExecutionException ? (ExecutionException) ex : new ExecutionException(ex)));
return new AsyncResult<>(null, ex);
}
/**
* Determine the exposed exception: either the cause of a given
* {@link ExecutionException}, or the original exception as-is.
* @param original the original as given to {@link #forExecutionException}
* @return the exposed exception
*/
private static Throwable exposedException(Throwable original) {
if (original instanceof ExecutionException) {
Throwable cause = original.getCause();
if (cause != null) {
return cause;
}
}
return original;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,6 +19,7 @@ package org.springframework.scheduling.annotation;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.junit.Test;
@ -33,7 +34,7 @@ import static org.junit.Assert.*;
public class AsyncResultTests {
@Test
public void asyncResultWithCallbackAndValue() {
public void asyncResultWithCallbackAndValue() throws Exception {
String value = "val";
final Set<String> values = new HashSet<>(1);
ListenableFuture<String> future = AsyncResult.forValue(value);
@ -48,10 +49,13 @@ public class AsyncResultTests {
}
});
assertSame(value, values.iterator().next());
assertSame(value, future.get());
assertSame(value, future.completable().get());
future.completable().thenAccept(v -> assertSame(value, v));
}
@Test
public void asyncResultWithCallbackAndException() {
public void asyncResultWithCallbackAndException() throws Exception {
IOException ex = new IOException();
final Set<Throwable> values = new HashSet<>(1);
ListenableFuture<String> future = AsyncResult.forExecutionException(ex);
@ -66,24 +70,55 @@ public class AsyncResultTests {
}
});
assertSame(ex, values.iterator().next());
try {
future.get();
fail("Should have thrown ExecutionException");
}
catch (ExecutionException ex2) {
assertSame(ex, ex2.getCause());
}
try {
future.completable().get();
fail("Should have thrown ExecutionException");
}
catch (ExecutionException ex2) {
assertSame(ex, ex2.getCause());
}
}
@Test
public void asyncResultWithSeparateCallbacksAndValue() {
public void asyncResultWithSeparateCallbacksAndValue() throws Exception {
String value = "val";
final Set<String> values = new HashSet<>(1);
ListenableFuture<String> future = AsyncResult.forValue(value);
future.addCallback(values::add, (ex) -> fail("Failure callback not expected: " + ex));
assertSame(value, values.iterator().next());
assertSame(value, future.get());
assertSame(value, future.completable().get());
future.completable().thenAccept(v -> assertSame(value, v));
}
@Test
public void asyncResultWithSeparateCallbacksAndException() {
public void asyncResultWithSeparateCallbacksAndException() throws Exception {
IOException ex = new IOException();
final Set<Throwable> values = new HashSet<>(1);
ListenableFuture<String> future = AsyncResult.forExecutionException(ex);
future.addCallback((result) -> fail("Success callback not expected: " + result), values::add);
assertSame(ex, values.iterator().next());
try {
future.get();
fail("Should have thrown ExecutionException");
}
catch (ExecutionException ex2) {
assertSame(ex, ex2.getCause());
}
try {
future.completable().get();
fail("Should have thrown ExecutionException");
}
catch (ExecutionException ex2) {
assertSame(ex, ex2.getCause());
}
}
}

View File

@ -73,6 +73,12 @@ public class CompletableToListenableFutureAdapter<T> implements ListenableFuture
this.callbacks.addFailureCallback(failureCallback);
}
@Override
public CompletableFuture<T> completable() {
return this.completableFuture;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return this.completableFuture.cancel(mayInterruptIfRunning);

View File

@ -0,0 +1,44 @@
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.util.concurrent;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
/**
* Extension of {@link CompletableFuture} which allows for cancelling
* a delegate along with the {@link CompletableFuture} itself.
*
* @author Juergen Hoeller
* @since 5.0
*/
class DelegatingCompletableFuture<T> extends CompletableFuture<T> {
private final Future<T> delegate;
public DelegatingCompletableFuture(Future<T> delegate) {
this.delegate = delegate;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean result = this.delegate.cancel(mayInterruptIfRunning);
super.cancel(mayInterruptIfRunning);
return result;
}
}

View File

@ -16,6 +16,7 @@
package org.springframework.util.concurrent;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
/**
@ -27,6 +28,7 @@ import java.util.concurrent.Future;
*
* @author Arjen Poutsma
* @author Sebastien Deleuze
* @author Juergen Hoeller
* @since 4.0
*/
public interface ListenableFuture<T> extends Future<T> {
@ -45,4 +47,15 @@ public interface ListenableFuture<T> extends Future<T> {
*/
void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);
/**
* Expose this {@link ListenableFuture} as a JDK {@link CompletableFuture}.
* @since 5.0
*/
default CompletableFuture<T> completable() {
CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);
addCallback(completable::complete, completable::completeExceptionally);
return completable;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,6 +18,8 @@ package org.springframework.util.concurrent;
import java.util.concurrent.ExecutionException;
import org.springframework.lang.Nullable;
/**
* Abstract class that adapts a {@link ListenableFuture} parameterized over S into a
* {@code ListenableFuture} parameterized over T. All methods are delegated to the
@ -51,19 +53,21 @@ public abstract class ListenableFutureAdapter<T, S> extends FutureAdapter<T, S>
ListenableFuture<S> listenableAdaptee = (ListenableFuture<S>) getAdaptee();
listenableAdaptee.addCallback(new ListenableFutureCallback<S>() {
@Override
public void onSuccess(S result) {
T adapted;
try {
adapted = adaptInternal(result);
}
catch (ExecutionException ex) {
Throwable cause = ex.getCause();
onFailure(cause != null ? cause : ex);
return;
}
catch (Throwable ex) {
onFailure(ex);
return;
public void onSuccess(@Nullable S result) {
T adapted = null;
if (result != null) {
try {
adapted = adaptInternal(result);
}
catch (ExecutionException ex) {
Throwable cause = ex.getCause();
onFailure(cause != null ? cause : ex);
return;
}
catch (Throwable ex) {
onFailure(ex);
return;
}
}
successCallback.onSuccess(adapted);
}

View File

@ -17,6 +17,7 @@
package org.springframework.util.concurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
@ -65,6 +66,14 @@ public class ListenableFutureTask<T> extends FutureTask<T> implements Listenable
this.callbacks.addFailureCallback(failureCallback);
}
@Override
public CompletableFuture<T> completable() {
CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);
this.callbacks.addSuccessCallback(completable::complete);
this.callbacks.addFailureCallback(completable::completeExceptionally);
return completable;
}
@Override
protected void done() {

View File

@ -17,6 +17,7 @@
package org.springframework.util.concurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -68,6 +69,7 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
return this.settableTask.setExceptionResult(exception);
}
@Override
public void addCallback(ListenableFutureCallback<? super T> callback) {
this.settableTask.addCallback(callback);
@ -78,6 +80,12 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
this.settableTask.addCallback(successCallback, failureCallback);
}
@Override
public CompletableFuture<T> completable() {
return this.settableTask.completable();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = this.settableTask.cancel(mayInterruptIfRunning);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,6 +18,7 @@ package org.springframework.util.concurrent;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import org.junit.Test;
@ -34,12 +35,8 @@ public class ListenableFutureTaskTests {
@Test
public void success() throws Exception {
final String s = "Hello World";
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
return s;
}
};
Callable<String> callable = () -> s;
ListenableFutureTask<String> task = new ListenableFutureTask<>(callable);
task.addCallback(new ListenableFutureCallback<String>() {
@Override
@ -52,17 +49,19 @@ public class ListenableFutureTaskTests {
}
});
task.run();
assertSame(s, task.get());
assertSame(s, task.completable().get());
task.completable().thenAccept(v -> assertSame(s, v));
}
@Test
public void failure() throws Exception {
final String s = "Hello World";
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
throw new IOException(s);
}
Callable<String> callable = () -> {
throw new IOException(s);
};
ListenableFutureTask<String> task = new ListenableFutureTask<>(callable);
task.addCallback(new ListenableFutureCallback<String>() {
@Override
@ -75,12 +74,28 @@ public class ListenableFutureTaskTests {
}
});
task.run();
try {
task.get();
fail("Should have thrown ExecutionException");
}
catch (ExecutionException ex) {
assertSame(s, ex.getCause().getMessage());
}
try {
task.completable().get();
fail("Should have thrown ExecutionException");
}
catch (ExecutionException ex) {
assertSame(s, ex.getCause().getMessage());
}
}
@Test
public void successWithLambdas() throws Exception {
final String s = "Hello World";
Callable<String> callable = () -> s;
SuccessCallback<String> successCallback = mock(SuccessCallback.class);
FailureCallback failureCallback = mock(FailureCallback.class);
ListenableFutureTask<String> task = new ListenableFutureTask<>(callable);
@ -88,6 +103,10 @@ public class ListenableFutureTaskTests {
task.run();
verify(successCallback).onSuccess(s);
verifyZeroInteractions(failureCallback);
assertSame(s, task.get());
assertSame(s, task.completable().get());
task.completable().thenAccept(v -> assertSame(s, v));
}
@Test
@ -97,6 +116,7 @@ public class ListenableFutureTaskTests {
Callable<String> callable = () -> {
throw ex;
};
SuccessCallback<String> successCallback = mock(SuccessCallback.class);
FailureCallback failureCallback = mock(FailureCallback.class);
ListenableFutureTask<String> task = new ListenableFutureTask<>(callable);
@ -104,6 +124,21 @@ public class ListenableFutureTaskTests {
task.run();
verify(failureCallback).onFailure(ex);
verifyZeroInteractions(successCallback);
try {
task.get();
fail("Should have thrown ExecutionException");
}
catch (ExecutionException ex2) {
assertSame(s, ex2.getCause().getMessage());
}
try {
task.completable().get();
fail("Should have thrown ExecutionException");
}
catch (ExecutionException ex2) {
assertSame(s, ex2.getCause().getMessage());
}
}
}

View File

@ -18,6 +18,7 @@ package org.springframework.util.concurrent;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -52,6 +53,16 @@ public class SettableListenableFutureTests {
assertTrue(settableListenableFuture.isDone());
}
@Test
public void returnsSetValueFromCompletable() throws ExecutionException, InterruptedException {
String string = "hello";
assertTrue(settableListenableFuture.set(string));
Future<String> completable = settableListenableFuture.completable();
assertThat(completable.get(), equalTo(string));
assertFalse(completable.isCancelled());
assertTrue(completable.isDone());
}
@Test
public void setValueUpdatesDoneStatus() {
settableListenableFuture.set("hello");
@ -60,7 +71,7 @@ public class SettableListenableFutureTests {
}
@Test
public void throwsSetExceptionWrappedInExecutionException() throws ExecutionException, InterruptedException {
public void throwsSetExceptionWrappedInExecutionException() throws Exception {
Throwable exception = new RuntimeException();
assertTrue(settableListenableFuture.setException(exception));
@ -77,7 +88,25 @@ public class SettableListenableFutureTests {
}
@Test
public void throwsSetErrorWrappedInExecutionException() throws ExecutionException, InterruptedException {
public void throwsSetExceptionWrappedInExecutionExceptionFromCompletable() throws Exception {
Throwable exception = new RuntimeException();
assertTrue(settableListenableFuture.setException(exception));
Future<String> completable = settableListenableFuture.completable();
try {
completable.get();
fail("Expected ExecutionException");
}
catch (ExecutionException ex) {
assertThat(ex.getCause(), equalTo(exception));
}
assertFalse(completable.isCancelled());
assertTrue(completable.isDone());
}
@Test
public void throwsSetErrorWrappedInExecutionException() throws Exception {
Throwable exception = new OutOfMemoryError();
assertTrue(settableListenableFuture.setException(exception));
@ -93,6 +122,24 @@ public class SettableListenableFutureTests {
assertTrue(settableListenableFuture.isDone());
}
@Test
public void throwsSetErrorWrappedInExecutionExceptionFromCompletable() throws Exception {
Throwable exception = new OutOfMemoryError();
assertTrue(settableListenableFuture.setException(exception));
Future<String> completable = settableListenableFuture.completable();
try {
completable.get();
fail("Expected ExecutionException");
}
catch (ExecutionException ex) {
assertThat(ex.getCause(), equalTo(exception));
}
assertFalse(completable.isCancelled());
assertTrue(completable.isDone());
}
@Test
public void setValueTriggersCallback() {
String string = "hello";

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -48,14 +48,7 @@ import org.springframework.util.MultiValueMap;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;
/**
* @author Arjen Poutsma
@ -78,6 +71,16 @@ public class AsyncRestTemplateIntegrationTests extends AbstractMockWebServerTest
assertEquals("Invalid status code", HttpStatus.OK, entity.getStatusCode());
}
@Test
public void getEntityFromCompletable() throws Exception {
ListenableFuture<ResponseEntity<String>> future = template.getForEntity(baseUrl + "/{method}", String.class, "get");
ResponseEntity<String> entity = future.completable().get();
assertEquals("Invalid content", helloWorld, entity.getBody());
assertFalse("No headers", entity.getHeaders().isEmpty());
assertEquals("Invalid content-type", textContentType, entity.getHeaders().getContentType());
assertEquals("Invalid status code", HttpStatus.OK, entity.getStatusCode());
}
@Test
public void multipleFutureGets() throws Exception {
Future<ResponseEntity<String>> future = template.getForEntity(baseUrl + "/{method}", String.class, "get");