diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractMonoToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractMonoToListenableFutureAdapter.java index 9e3870556b..c093c75c7b 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractMonoToListenableFutureAdapter.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractMonoToListenableFutureAdapter.java @@ -89,7 +89,8 @@ abstract class AbstractMonoToListenableFutureAdapter implements Listenable return false; } this.monoProcessor.cancel(); - return true; + // isCancelled may still return false, if mono completed before the cancel + return this.monoProcessor.isCancelled(); } @Override diff --git a/spring-messaging/src/test/java/org/springframework/messaging/support/MonoToListenableFutureAdapterTests.java b/spring-messaging/src/test/java/org/springframework/messaging/support/MonoToListenableFutureAdapterTests.java new file mode 100644 index 0000000000..e9f3c7a905 --- /dev/null +++ b/spring-messaging/src/test/java/org/springframework/messaging/support/MonoToListenableFutureAdapterTests.java @@ -0,0 +1,72 @@ +/* + * Copyright 2002-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.messaging.support; + +import java.time.Duration; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; +import reactor.core.publisher.Mono; + +import org.springframework.util.concurrent.ListenableFuture; + +import static org.junit.Assert.*; + +/** + * Unit tests for {@link MonoToListenableFutureAdapter}. + * @author Rossen Stoyanchev + */ +public class MonoToListenableFutureAdapterTests { + + @Test + public void success() { + String expected = "one"; + AtomicReference actual = new AtomicReference<>(); + ListenableFuture future = new MonoToListenableFutureAdapter<>(Mono.just(expected)); + future.addCallback(actual::set, actual::set); + + assertEquals(expected, actual.get()); + } + + @Test + public void failure() { + Throwable expected = new IllegalStateException("oops"); + AtomicReference actual = new AtomicReference<>(); + ListenableFuture future = new MonoToListenableFutureAdapter<>(Mono.error(expected)); + future.addCallback(actual::set, actual::set); + + assertEquals(expected, actual.get()); + } + + @Test + public void cancellation() { + Mono mono = Mono.delay(Duration.ofSeconds(60)); + Future future = new MonoToListenableFutureAdapter<>(mono); + + assertTrue(future.cancel(true)); + assertTrue(future.isCancelled()); + } + + @Test + public void cancellationAfterTerminated() { + Future future = new MonoToListenableFutureAdapter<>(Mono.empty()); + + assertFalse("Should return false if task already completed", future.cancel(true)); + assertFalse(future.isCancelled()); + } + +}