From c9b99daa740ffe029a25b6ff46cb29a891ea097b Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Thu, 2 Feb 2017 19:54:08 +0100 Subject: [PATCH] SettableListenableFuture centralizes state in ListenableFutureTask subclass Issue: SPR-15216 --- .../util/concurrent/ListenableFutureTask.java | 5 +- .../concurrent/SettableListenableFuture.java | 127 ++++++++---------- 2 files changed, 60 insertions(+), 72 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureTask.java b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureTask.java index 2bb8faa31b..9178ee9c93 100644 --- a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureTask.java +++ b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureTask.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -63,8 +63,9 @@ public class ListenableFutureTask extends FutureTask implements Listenable this.callbacks.addFailureCallback(failureCallback); } + @Override - protected final void done() { + protected void done() { Throwable cause; try { T result = get(); diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java b/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java index db46e9af29..fa6bc00a34 100644 --- a/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java +++ b/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,10 +20,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; import org.springframework.util.Assert; -import org.springframework.util.ReflectionUtils; /** * A {@link org.springframework.util.concurrent.ListenableFuture ListenableFuture} @@ -39,15 +37,15 @@ import org.springframework.util.ReflectionUtils; */ public class SettableListenableFuture implements ListenableFuture { - private final SettableTask settableTask; - - private final ListenableFutureTask listenableFuture; + private static final Callable DUMMY_CALLABLE = new Callable() { + @Override + public Object call() throws Exception { + throw new IllegalStateException("Should never be called"); + } + }; - public SettableListenableFuture() { - this.settableTask = new SettableTask<>(); - this.listenableFuture = new ListenableFutureTask<>(this.settableTask); - } + private final SettableTask settableTask = new SettableTask<>(); /** @@ -58,11 +56,7 @@ public class SettableListenableFuture implements ListenableFuture { * @return {@code true} if the value was successfully set, else {@code false} */ public boolean set(T value) { - boolean success = this.settableTask.setValue(value); - if (success) { - this.listenableFuture.run(); - } - return success; + return this.settableTask.setResultValue(value); } /** @@ -74,27 +68,22 @@ public class SettableListenableFuture implements ListenableFuture { */ public boolean setException(Throwable exception) { Assert.notNull(exception, "Exception must not be null"); - boolean success = this.settableTask.setException(exception); - if (success) { - this.listenableFuture.run(); - } - return success; + return this.settableTask.setExceptionResult(exception); } @Override public void addCallback(ListenableFutureCallback callback) { - this.listenableFuture.addCallback(callback); + this.settableTask.addCallback(callback); } @Override public void addCallback(SuccessCallback successCallback, FailureCallback failureCallback) { - this.listenableFuture.addCallback(successCallback, failureCallback); + this.settableTask.addCallback(successCallback, failureCallback); } @Override public boolean cancel(boolean mayInterruptIfRunning) { - boolean cancelled = this.settableTask.setCancelled(); - this.listenableFuture.cancel(mayInterruptIfRunning); + boolean cancelled = this.settableTask.cancel(mayInterruptIfRunning); if (cancelled && mayInterruptIfRunning) { interruptTask(); } @@ -113,78 +102,76 @@ public class SettableListenableFuture implements ListenableFuture { /** * Retrieve the value. - *

Will return the value if it has been set via {@link #set(Object)}, - * throw an {@link java.util.concurrent.ExecutionException} if it has been - * set via {@link #setException(Throwable)} or throw a - * {@link java.util.concurrent.CancellationException} if it has been cancelled. - * @return The value associated with this future. + *

This method returns the value if it has been set via {@link #set(Object)}, + * throws an {@link java.util.concurrent.ExecutionException} if an exception has + * been set via {@link #setException(Throwable)}, or throws a + * {@link java.util.concurrent.CancellationException} if the future has been cancelled. + * @return the value associated with this future */ @Override public T get() throws InterruptedException, ExecutionException { - return this.listenableFuture.get(); + return this.settableTask.get(); } /** * Retrieve the value. - *

Will return the value if it has been set via {@link #set(Object)}, - * throw an {@link java.util.concurrent.ExecutionException} if it has been - * set via {@link #setException(Throwable)} or throw a - * {@link java.util.concurrent.CancellationException} if it has been cancelled. - * @param timeout the maximum time to wait. - * @param unit the time unit of the timeout argument. - * @return The value associated with this future. + *

This method returns the value if it has been set via {@link #set(Object)}, + * throws an {@link java.util.concurrent.ExecutionException} if an exception has + * been set via {@link #setException(Throwable)}, or throws a + * {@link java.util.concurrent.CancellationException} if the future has been cancelled. + * @param timeout the maximum time to wait + * @param unit the unit of the timeout argument + * @return the value associated with this future */ @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return this.listenableFuture.get(timeout, unit); + return this.settableTask.get(timeout, unit); } /** * Subclasses can override this method to implement interruption of the future's * computation. The method is invoked automatically by a successful call to * {@link #cancel(boolean) cancel(true)}. - *

The default implementation does nothing. + *

The default implementation is empty. */ protected void interruptTask() { } - private static class SettableTask implements Callable { + private static class SettableTask extends ListenableFutureTask { - private static final Object NO_VALUE = new Object(); - - private static final Object CANCELLED = new Object(); - - private final AtomicReference value = new AtomicReference<>(NO_VALUE); - - public boolean setValue(T value) { - return this.value.compareAndSet(NO_VALUE, value); - } - - public boolean setException(Throwable exception) { - return this.value.compareAndSet(NO_VALUE, exception); - } - - public boolean setCancelled() { - return this.value.compareAndSet(NO_VALUE, CANCELLED); - } - - public boolean isCancelled() { - return (this.value.get() == CANCELLED); - } - - public boolean isDone() { - return (this.value.get() != NO_VALUE); - } + private volatile Thread completingThread; @SuppressWarnings("unchecked") + public SettableTask() { + super((Callable) DUMMY_CALLABLE); + } + + public boolean setResultValue(T value) { + set(value); + return checkCompletingThread(); + } + + public boolean setExceptionResult(Throwable exception) { + setException(exception); + return checkCompletingThread(); + } + @Override - public T call() throws Exception { - Object val = this.value.get(); - if (val instanceof Throwable) { - ReflectionUtils.rethrowException((Throwable) val); + protected void done() { + if (!isCancelled()) { + // Implicitly invoked by set/setException: store current thread for + // determining whether the given result has actually triggered completion + // (since FutureTask.set/setException unfortunately don't expose that) + this.completingThread = Thread.currentThread(); } - return (T) val; + super.done(); + } + + private boolean checkCompletingThread() { + boolean check = (this.completingThread == Thread.currentThread()); + this.completingThread = null; // only first check actually counts + return check; } }