Deprecate ListenableFuture in favor of CompletableFuture

This commit deprecates ListenableFuture in favor of CompletableFuture.
ListenableFuture was introduced in Spring Framework 4.0, when
CompletableFuture was not yet available. Spring now requires JDK 17, so
having our own type no longer seems necessary.

Major changes in this commit include:
- Deprecation of ListenableFuture and related types
  (ListenableFutureCallback, SettableListenableFuture, etc.)
- Deprecation of AsyncListenableTaskExecutor in favor of default methods
  in AsyncTaskExecutor (submitCompletable).
- AsyncHandlerMethodReturnValueHandler now has toCompletableFuture
  instead of toListenableFuture.
- WebSocketClient now has execute methods, which do the same as
  doHandshake, but return CompletableFutures (cf. the reactive
  WebSocketClient).

All other changes
- add an overloaded method that takes a CompletableFuture parameter
  instead of ListenableFuture, and/or
- add a method with a 'Async' suffix that returns a CompletableFuture
  instead of a ListenableFuture (connectAsync, sendAsync).

Closes gh-27780
This commit is contained in:
Arjen Poutsma 2022-03-17 12:18:00 +01:00
parent 735051bf7d
commit 2aa74c9121
74 changed files with 1148 additions and 380 deletions

View File

@ -20,7 +20,6 @@ import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
@ -158,7 +157,6 @@ public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
/**
* Determine the specific executor to use when executing the given method.
* <p>Should preferably return an {@link AsyncListenableTaskExecutor} implementation.
* @return the executor to use (or {@code null}, but just if no default executor is available)
*/
@Nullable
@ -176,8 +174,8 @@ public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
executor = (targetExecutor instanceof AsyncTaskExecutor ?
(AsyncTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
@ -276,17 +274,11 @@ public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
* @param returnType the declared return type (potentially a {@link Future} variant)
* @return the execution result (potentially a corresponding {@link Future} handle)
*/
@SuppressWarnings("deprecation")
@Nullable
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
return executor.submitCompletable(task);
}
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);

View File

@ -47,6 +47,7 @@ import org.springframework.util.concurrent.ListenableFutureTask;
* @see org.springframework.core.task.TaskExecutor
* @see SchedulerFactoryBean#setTaskExecutor
*/
@SuppressWarnings("deprecation")
public class SimpleThreadPoolTaskExecutor extends SimpleThreadPool
implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, InitializingBean, DisposableBean {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -262,6 +262,7 @@ public class ApplicationListenerMethodAdapter implements GenericApplicationListe
return new Object[] {event};
}
@SuppressWarnings("deprecation")
protected void handleResult(Object result) {
if (reactiveStreamsPresent && new ReactiveResultHandler().subscribeToPublisher(result)) {
if (logger.isTraceEnabled()) {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -45,7 +45,9 @@ import org.springframework.util.concurrent.SuccessCallback;
* @see Async
* @see #forValue(Object)
* @see #forExecutionException(Throwable)
* @deprecated as of 6.0, in favor of {@link CompletableFuture}
*/
@Deprecated
public class AsyncResult<V> implements ListenableFuture<V> {
@Nullable

View File

@ -62,6 +62,7 @@ import org.springframework.util.concurrent.ListenableFuture;
* @see DefaultManagedTaskExecutor
* @see ThreadPoolTaskExecutor
*/
@SuppressWarnings("deprecation")
public class ConcurrentTaskExecutor implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
@Nullable

View File

@ -80,7 +80,7 @@ import org.springframework.util.concurrent.ListenableFutureTask;
* @see ThreadPoolExecutorFactoryBean
* @see ConcurrentTaskExecutor
*/
@SuppressWarnings("serial")
@SuppressWarnings({"serial", "deprecation"})
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {

View File

@ -59,7 +59,7 @@ import org.springframework.util.concurrent.ListenableFutureTask;
* @see #setThreadFactory
* @see #setErrorHandler
*/
@SuppressWarnings("serial")
@SuppressWarnings({"serial", "deprecation"})
public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, TaskScheduler {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,10 +18,13 @@ package org.springframework.scheduling.concurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -33,6 +36,7 @@ import org.junit.jupiter.api.TestInfo;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.lang.Nullable;
import org.springframework.util.concurrent.ListenableFuture;
import static org.assertj.core.api.Assertions.assertThat;
@ -135,6 +139,21 @@ abstract class AbstractSchedulingTaskExecutorTests {
assertThreadNamePrefix(task);
}
@Test
void submitCompletableRunnable() throws Exception {
TestTask task = new TestTask(this.testName, 1);
// Act
CompletableFuture<Void> future = executor.submitCompletable(task);
future.whenComplete(this::storeOutcome);
// Assert
Awaitility.await()
.atMost(1, TimeUnit.SECONDS)
.pollInterval(10, TimeUnit.MILLISECONDS)
.until(future::isDone);
assertThat(outcome).isNull();
assertThreadNamePrefix(task);
}
@Test
void submitFailingListenableRunnable() throws Exception {
TestTask task = new TestTask(this.testName, 0);
@ -149,6 +168,20 @@ abstract class AbstractSchedulingTaskExecutorTests {
assertThat(outcome.getClass()).isSameAs(RuntimeException.class);
}
@Test
void submitFailingCompletableRunnable() throws Exception {
TestTask task = new TestTask(this.testName, 0);
CompletableFuture<?> future = executor.submitCompletable(task);
future.whenComplete(this::storeOutcome);
Awaitility.await()
.dontCatchUncaughtExceptions()
.atMost(1, TimeUnit.SECONDS)
.pollInterval(10, TimeUnit.MILLISECONDS)
.until(() -> future.isDone() && outcome != null);
assertThat(outcome.getClass()).isSameAs(CompletionException.class);
}
@Test
void submitListenableRunnableWithGetAfterShutdown() throws Exception {
ListenableFuture<?> future1 = executor.submitListenable(new TestTask(this.testName, -1));
@ -169,6 +202,26 @@ abstract class AbstractSchedulingTaskExecutorTests {
future2.get(1000, TimeUnit.MILLISECONDS)));
}
@Test
void submitCompletableRunnableWithGetAfterShutdown() throws Exception {
CompletableFuture<?> future1 = executor.submitCompletable(new TestTask(this.testName, -1));
CompletableFuture<?> future2 = executor.submitCompletable(new TestTask(this.testName, -1));
shutdownExecutor();
try {
future1.get(1000, TimeUnit.MILLISECONDS);
}
catch (Exception ex) {
/* ignore */
}
Awaitility.await()
.atMost(4, TimeUnit.SECONDS)
.pollInterval(10, TimeUnit.MILLISECONDS)
.untilAsserted(() ->
assertThatExceptionOfType(TimeoutException.class).isThrownBy(() ->
future2.get(1000, TimeUnit.MILLISECONDS)));
}
@Test
void submitCallable() throws Exception {
TestCallable task = new TestCallable(this.testName, 1);
@ -246,6 +299,57 @@ abstract class AbstractSchedulingTaskExecutorTests {
});
}
@Test
void submitCompletableCallable() throws Exception {
TestCallable task = new TestCallable(this.testName, 1);
// Act
CompletableFuture<String> future = this.executor.submitCompletable(task);
future.whenComplete(this::storeOutcome);
// Assert
Awaitility.await()
.atMost(1, TimeUnit.SECONDS)
.pollInterval(10, TimeUnit.MILLISECONDS)
.until(() -> future.isDone() && outcome != null);
assertThat(outcome.toString().substring(0, this.threadNamePrefix.length())).isEqualTo(this.threadNamePrefix);
}
@Test
void submitFailingCompletableCallable() throws Exception {
TestCallable task = new TestCallable(this.testName, 0);
// Act
CompletableFuture<String> future = this.executor.submitCompletable(task);
future.whenComplete(this::storeOutcome);
// Assert
Awaitility.await()
.dontCatchUncaughtExceptions()
.atMost(1, TimeUnit.SECONDS)
.pollInterval(10, TimeUnit.MILLISECONDS)
.until(() -> future.isDone() && outcome != null);
assertThat(outcome.getClass()).isSameAs(CompletionException.class);
}
@Test
void submitCompletableCallableWithGetAfterShutdown() throws Exception {
CompletableFuture<?> future1 = executor.submitCompletable(new TestCallable(this.testName, -1));
CompletableFuture<?> future2 = executor.submitCompletable(new TestCallable(this.testName, -1));
shutdownExecutor();
assertThatExceptionOfType(TimeoutException.class).isThrownBy(() -> {
future1.get(1000, TimeUnit.MILLISECONDS);
future2.get(1000, TimeUnit.MILLISECONDS);
});
}
private void storeOutcome(@Nullable Object o, @Nullable Throwable t) {
if (o != null) {
this.outcome = o;
}
else if (t != null) {
this.outcome = t;
}
}
protected void assertThreadNamePrefix(TestTask task) {
assertThat(task.lastThread.getName().substring(0, this.threadNamePrefix.length())).isEqualTo(this.threadNamePrefix);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -27,7 +27,11 @@ import org.springframework.util.concurrent.ListenableFuture;
* @author Arjen Poutsma
* @since 4.0
* @see ListenableFuture
* @deprecated as of 6.0, in favor of
* {@link AsyncTaskExecutor#submitCompletable(Runnable)} and
* {@link AsyncTaskExecutor#submitCompletable(Callable)}
*/
@Deprecated
public interface AsyncListenableTaskExecutor extends AsyncTaskExecutor {
/**
@ -36,7 +40,9 @@ public interface AsyncListenableTaskExecutor extends AsyncTaskExecutor {
* @param task the {@code Runnable} to execute (never {@code null})
* @return a {@code ListenableFuture} representing pending completion of the task
* @throws TaskRejectedException if the given task was not accepted
* @deprecated in favor of {@link AsyncTaskExecutor#submitCompletable(Runnable)}
*/
@Deprecated
ListenableFuture<?> submitListenable(Runnable task);
/**
@ -46,7 +52,9 @@ public interface AsyncListenableTaskExecutor extends AsyncTaskExecutor {
* @param task the {@code Callable} to execute (never {@code null})
* @return a {@code ListenableFuture} representing pending completion of the task
* @throws TaskRejectedException if the given task was not accepted
* @deprecated in favor of {@link AsyncTaskExecutor#submitCompletable(Callable)}
*/
@Deprecated
<T> ListenableFuture<T> submitListenable(Callable<T> task);
}

View File

@ -17,8 +17,11 @@
package org.springframework.core.task;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.springframework.util.concurrent.FutureUtils;
/**
* Extended interface for asynchronous {@link TaskExecutor} implementations,
* offering support for {@link java.util.concurrent.Callable}.
@ -91,4 +94,29 @@ public interface AsyncTaskExecutor extends TaskExecutor {
*/
<T> Future<T> submit(Callable<T> task);
/**
* Submit a {@code Runnable} task for execution, receiving a {@code CompletableFuture}
* representing that task. The Future will return a {@code null} result upon completion.
* @param task the {@code Runnable} to execute (never {@code null})
* @return a {@code CompletableFuture} representing pending completion of the task
* @throws TaskRejectedException if the given task was not accepted
* @since 6.0
*/
default CompletableFuture<Void> submitCompletable(Runnable task) {
return CompletableFuture.runAsync(task, this);
}
/**
* Submit a {@code Callable} task for execution, receiving a {@code CompletableFuture}
* representing that task. The Future will return the Callable's result upon
* completion.
* @param task the {@code Callable} to execute (never {@code null})
* @return a {@code CompletableFuture} representing pending completion of the task
* @throws TaskRejectedException if the given task was not accepted
* @since 6.0
*/
default <T> CompletableFuture<T> submitCompletable(Callable<T> task) {
return FutureUtils.callAsync(task, this);
}
}

View File

@ -46,7 +46,7 @@ import org.springframework.util.concurrent.ListenableFutureTask;
* @see SyncTaskExecutor
* @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
*/
@SuppressWarnings("serial")
@SuppressWarnings({"serial", "deprecation"})
public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
implements AsyncListenableTaskExecutor, Serializable {

View File

@ -43,6 +43,7 @@ import org.springframework.util.concurrent.ListenableFutureTask;
* @see java.util.concurrent.ExecutorService
* @see java.util.concurrent.Executors
*/
@SuppressWarnings("deprecation")
public class TaskExecutorAdapter implements AsyncListenableTaskExecutor {
private final Executor concurrentExecutor;

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -30,7 +30,9 @@ import java.util.concurrent.TimeoutException;
* @author Juergen Hoeller
* @since 4.2
* @param <T> the result type returned by this Future's {@code get} method
* @deprecated as of 6.0, with no concrete replacement
*/
@Deprecated
public class CompletableToListenableFutureAdapter<T> implements ListenableFuture<T> {
private final CompletableFuture<T> completableFuture;

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,12 +16,17 @@
package org.springframework.util.concurrent;
import java.util.function.BiConsumer;
/**
* Failure callback for a {@link ListenableFuture}.
*
* @author Sebastien Deleuze
* @since 4.1
* @deprecated as of 6.0, in favor of
* {@link java.util.concurrent.CompletableFuture#whenComplete(BiConsumer)}
*/
@Deprecated
@FunctionalInterface
public interface FailureCallback {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -33,7 +33,9 @@ import org.springframework.util.Assert;
* @since 4.0
* @param <T> the type of this {@code Future}
* @param <S> the type of the adaptee's {@code Future}
* @deprecated as of 6.0, with no concrete replacement
*/
@Deprecated
public abstract class FutureAdapter<T, S> implements Future<T> {
private final Future<S> adaptee;

View File

@ -0,0 +1,82 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.util.concurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
import org.springframework.util.Assert;
/**
* Convenience utilities for working with {@link java.util.concurrent.Future}
* and implementations.
*
* @author Arjen Poutsma
* @since 6.0
*/
public abstract class FutureUtils {
/**
* Return a new {@code CompletableFuture} that is asynchronously completed
* by a task running in the {@link ForkJoinPool#commonPool()} with
* the value obtained by calling the given {@code Callable}.
* @param callable a function that returns the value to be used, or throws
* an exception
* @return the new CompletableFuture
*/
public static <T> CompletableFuture<T> callAsync(Callable<T> callable) {
Assert.notNull(callable, "Callable must not be null");
CompletableFuture<T> result = new CompletableFuture<>();
return result.completeAsync(toSupplier(callable, result));
}
/**
* Return a new {@code CompletableFuture} that is asynchronously completed
* by a task running in the given executor with the value obtained
* by calling the given Supplier.
* @param callable a function that returns the value to be used, or throws
* an exception
* @param executor the executor to use for asynchronous execution
* @return the new CompletableFuture
*/
public static <T> CompletableFuture<T> callAsync(Callable<T> callable, Executor executor) {
Assert.notNull(callable, "Callable must not be null");
Assert.notNull(executor, "Executor must not be null");
CompletableFuture<T> result = new CompletableFuture<>();
return result.completeAsync(toSupplier(callable, result), executor);
}
private static <T> Supplier<T> toSupplier(Callable<T> callable, CompletableFuture<T> result) {
return () -> {
try {
return callable.call();
}
catch (Exception ex) {
// wrap the exception just like CompletableFuture::supplyAsync does
result.completeExceptionally((ex instanceof CompletionException) ? ex : new CompletionException(ex));
return null;
}
};
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,6 +18,7 @@ package org.springframework.util.concurrent;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
/**
* Extend {@link Future} with the capability to accept completion callbacks.
@ -31,13 +32,18 @@ import java.util.concurrent.Future;
* @author Juergen Hoeller
* @since 4.0
* @param <T> the result type returned by this Future's {@code get} method
* @deprecated as of 6.0, in favor of {@link CompletableFuture}
*/
@Deprecated
public interface ListenableFuture<T> extends Future<T> {
/**
* Register the given {@code ListenableFutureCallback}.
* @param callback the callback to register
* @deprecated as of 6.0, in favor of
* {@link CompletableFuture#whenComplete(BiConsumer)}
*/
@Deprecated
void addCallback(ListenableFutureCallback<? super T> callback);
/**
@ -45,7 +51,10 @@ public interface ListenableFuture<T> extends Future<T> {
* @param successCallback the success callback
* @param failureCallback the failure callback
* @since 4.1
* @deprecated as of 6.0, in favor of
* {@link CompletableFuture#whenComplete(BiConsumer)}
*/
@Deprecated
void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -31,7 +31,10 @@ import org.springframework.lang.Nullable;
* @since 4.0
* @param <T> the type of this {@code Future}
* @param <S> the type of the adaptee's {@code Future}
* @deprecated as of 6.0, in favor of
* {@link java.util.concurrent.CompletableFuture}
*/
@Deprecated
public abstract class ListenableFutureAdapter<T, S> extends FutureAdapter<T, S> implements ListenableFuture<T> {
/**

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,6 +16,8 @@
package org.springframework.util.concurrent;
import java.util.function.BiConsumer;
/**
* Callback mechanism for the outcome, success or failure, from a
* {@link ListenableFuture}.
@ -24,7 +26,10 @@ package org.springframework.util.concurrent;
* @author Sebastien Deleuze
* @since 4.0
* @param <T> the result type
* @deprecated as of 6.0, in favor of
* {@link java.util.concurrent.CompletableFuture#whenComplete(BiConsumer)}
*/
@Deprecated
public interface ListenableFutureCallback<T> extends SuccessCallback<T>, FailureCallback {
}

View File

@ -33,7 +33,9 @@ import org.springframework.util.Assert;
* @author Rossen Stoyanchev
* @since 4.0
* @param <T> the callback result type
* @deprecated as of 6.0, with no concrete replacement
*/
@Deprecated
public class ListenableFutureCallbackRegistry<T> {
private final Queue<SuccessCallback<? super T>> successCallbacks = new ArrayDeque<>(1);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -29,7 +29,9 @@ import org.springframework.lang.Nullable;
* @author Arjen Poutsma
* @since 4.0
* @param <T> the result type returned by this Future's {@code get} method
* @deprecated as of 6.0, with no concrete replacement
*/
@Deprecated
public class ListenableFutureTask<T> extends FutureTask<T> implements ListenableFuture<T> {
private final ListenableFutureCallbackRegistry<T> callbacks = new ListenableFutureCallbackRegistry<>();

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -27,7 +27,9 @@ import reactor.core.publisher.Mono;
* @author Stephane Maldini
* @since 5.1
* @param <T> the object type
* @deprecated as of 6.0, in favor of {@link Mono#toFuture()}
*/
@Deprecated
public class MonoToListenableFutureAdapter<T> extends CompletableToListenableFutureAdapter<T> {
public MonoToListenableFutureAdapter(Mono<T> mono) {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -36,7 +36,9 @@ import org.springframework.util.Assert;
* @author Juergen Hoeller
* @since 4.1
* @param <T> the result type returned by this Future's {@code get} method
* @deprecated as of 6.0, in favor of {@link CompletableFuture}
*/
@Deprecated
public class SettableListenableFuture<T> implements ListenableFuture<T> {
private static final Callable<Object> DUMMY_CALLABLE = () -> {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,6 +16,8 @@
package org.springframework.util.concurrent;
import java.util.function.BiConsumer;
import org.springframework.lang.Nullable;
/**
@ -24,7 +26,10 @@ import org.springframework.lang.Nullable;
* @author Sebastien Deleuze
* @since 4.1
* @param <T> the result type
* @deprecated as of 6.0, in favor of
* {@link java.util.concurrent.CompletableFuture#whenComplete(BiConsumer)}
*/
@Deprecated
@FunctionalInterface
public interface SuccessCallback<T> {

View File

@ -0,0 +1,116 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.util.concurrent;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import org.junit.jupiter.api.Test;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
/**
* @author Arjen Poutsma
*/
class FutureUtilsTests {
@Test
void callAsyncNormal() throws ExecutionException, InterruptedException {
String foo = "Foo";
CompletableFuture<String> future = FutureUtils.callAsync(() -> foo);
assertThat(future.get()).isEqualTo(foo);
assertThat(future.isCancelled()).isFalse();
assertThat(future.isDone()).isTrue();
CountDownLatch latch = new CountDownLatch(1);
future.whenComplete((s, throwable) -> {
assertThat(s).isEqualTo(foo);
assertThat(throwable).isNull();
latch.countDown();
});
latch.await();
}
@Test
void callAsyncException() throws InterruptedException {
RuntimeException ex = new RuntimeException("Foo");
CompletableFuture<String> future = FutureUtils.callAsync(() -> {
throw ex;
});
assertThatExceptionOfType(ExecutionException.class)
.isThrownBy(future::get)
.withCause(ex);
assertThat(future.isCancelled()).isFalse();
assertThat(future.isDone()).isTrue();
CountDownLatch latch = new CountDownLatch(1);
future.whenComplete((s, throwable) -> {
assertThat(s).isNull();
assertThat(throwable).isInstanceOf(CompletionException.class)
.hasCause(ex);
latch.countDown();
});
latch.await();
}
@Test
void callAsyncNormalExecutor() throws ExecutionException, InterruptedException {
String foo = "Foo";
CompletableFuture<String> future = FutureUtils.callAsync(() -> foo, new SimpleAsyncTaskExecutor());
assertThat(future.get()).isEqualTo(foo);
assertThat(future.isCancelled()).isFalse();
assertThat(future.isDone()).isTrue();
CountDownLatch latch = new CountDownLatch(1);
future.whenComplete((s, throwable) -> {
assertThat(s).isEqualTo(foo);
assertThat(throwable).isNull();
latch.countDown();
});
latch.await();
}
@Test
void callAsyncExceptionExecutor() throws InterruptedException {
RuntimeException ex = new RuntimeException("Foo");
CompletableFuture<String> future = FutureUtils.callAsync(() -> {
throw ex;
}, new SimpleAsyncTaskExecutor());
assertThatExceptionOfType(ExecutionException.class)
.isThrownBy(future::get)
.withCause(ex);
assertThat(future.isCancelled()).isFalse();
assertThat(future.isDone()).isTrue();
CountDownLatch latch = new CountDownLatch(1);
future.whenComplete((s, throwable) -> {
assertThat(s).isNull();
assertThat(throwable).isInstanceOf(CompletionException.class)
.hasCause(ex);
latch.countDown();
});
latch.await();
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -26,7 +26,9 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -54,8 +56,6 @@ import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* Abstract base class for HandlerMethod-based message handling. Provides most of
@ -571,9 +571,9 @@ public abstract class AbstractMethodMessageHandler<T>
return;
}
if (returnValue != null && this.returnValueHandlers.isAsyncReturnValue(returnValue, returnType)) {
ListenableFuture<?> future = this.returnValueHandlers.toListenableFuture(returnValue, returnType);
CompletableFuture<?> future = this.returnValueHandlers.toCompletableFuture(returnValue, returnType);
if (future != null) {
future.addCallback(new ReturnValueListenableFutureCallback(invocable, message));
future.whenComplete(new ReturnValueListenableFutureCallback(invocable, message));
}
}
else {
@ -704,7 +704,7 @@ public abstract class AbstractMethodMessageHandler<T>
}
private class ReturnValueListenableFutureCallback implements ListenableFutureCallback<Object> {
private class ReturnValueListenableFutureCallback implements BiConsumer<Object, Throwable> {
private final InvocableHandlerMethod handlerMethod;
@ -716,21 +716,21 @@ public abstract class AbstractMethodMessageHandler<T>
}
@Override
public void onSuccess(@Nullable Object result) {
try {
MethodParameter returnType = this.handlerMethod.getAsyncReturnValueType(result);
returnValueHandlers.handleReturnValue(result, returnType, this.message);
public void accept(@Nullable Object result, @Nullable Throwable ex) {
if (result != null) {
try {
MethodParameter returnType = this.handlerMethod.getAsyncReturnValueType(result);
returnValueHandlers.handleReturnValue(result, returnType, this.message);
}
catch (Throwable throwable) {
handleFailure(throwable);
}
}
catch (Throwable ex) {
else if (ex != null) {
handleFailure(ex);
}
}
@Override
public void onFailure(Throwable ex) {
handleFailure(ex);
}
private void handleFailure(Throwable ex) {
Exception cause = (ex instanceof Exception ? (Exception) ex : new IllegalStateException(ex));
processHandlerMethodException(this.handlerMethod, cause, this.message);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,8 +16,11 @@
package org.springframework.messaging.handler.invocation;
import java.util.concurrent.CompletableFuture;
import org.springframework.core.MethodParameter;
import org.springframework.lang.Nullable;
import org.springframework.util.concurrent.CompletableToListenableFutureAdapter;
import org.springframework.util.concurrent.ListenableFuture;
/**
@ -37,7 +40,7 @@ public interface AsyncHandlerMethodReturnValueHandler extends HandlerMethodRetur
/**
* Whether the return value represents an asynchronous, Future-like type
* with success and error callbacks. If this method returns {@code true},
* then {@link #toListenableFuture} is invoked next. If it returns
* then {@link #toCompletableFuture} is invoked next. If it returns
* {@code false}, then {@link #handleReturnValue} is called.
* <p><strong>Note:</strong> this method will only be invoked after
* {@link #supportsReturnType(org.springframework.core.MethodParameter)}
@ -61,8 +64,30 @@ public interface AsyncHandlerMethodReturnValueHandler extends HandlerMethodRetur
* @param returnType the type of the return value
* @return the resulting ListenableFuture, or {@code null} in which case
* no further handling will be performed
* @deprecated as of 6.0, in favor of
* {@link #toCompletableFuture(Object, MethodParameter)}
*/
@Deprecated
@Nullable
default ListenableFuture<?> toListenableFuture(Object returnValue, MethodParameter returnType) {
CompletableFuture<?> result = toCompletableFuture(returnValue, returnType);
return (result != null) ? new CompletableToListenableFutureAdapter<>(result) : null;
}
/**
* Adapt the asynchronous return value to a {@link CompletableFuture}.
* Return value handling will then continue when
* the CompletableFuture is completed with either success or error.
* <p><strong>Note:</strong> this method will only be invoked after
* {@link #supportsReturnType(org.springframework.core.MethodParameter)}
* is called and it returns {@code true}.
* @param returnValue the value returned from the handler method
* @param returnType the type of the return value
* @return the resulting CompletableFuture, or {@code null} in which case
* no further handling will be performed
* @since 6.0
*/
@Nullable
ListenableFuture<?> toListenableFuture(Object returnValue, MethodParameter returnType);
CompletableFuture<?> toCompletableFuture(Object returnValue, MethodParameter returnType);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -20,8 +20,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.springframework.core.MethodParameter;
import org.springframework.util.concurrent.CompletableToListenableFutureAdapter;
import org.springframework.util.concurrent.ListenableFuture;
/**
* Support for {@link CompletableFuture} (and as of 4.3.7 also {@link CompletionStage})
@ -39,9 +37,7 @@ public class CompletableFutureReturnValueHandler extends AbstractAsyncReturnValu
}
@Override
@SuppressWarnings("unchecked")
public ListenableFuture<?> toListenableFuture(Object returnValue, MethodParameter returnType) {
return new CompletableToListenableFutureAdapter<>((CompletionStage<Object>) returnValue);
public CompletableFuture<?> toCompletableFuture(Object returnValue, MethodParameter returnType) {
return ((CompletionStage<?>) returnValue).toCompletableFuture();
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,6 +19,7 @@ package org.springframework.messaging.handler.invocation;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -26,7 +27,6 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.core.MethodParameter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.concurrent.ListenableFuture;
/**
* A HandlerMethodReturnValueHandler that wraps and delegates to others.
@ -135,12 +135,11 @@ public class HandlerMethodReturnValueHandlerComposite implements AsyncHandlerMet
@Override
@Nullable
public ListenableFuture<?> toListenableFuture(Object returnValue, MethodParameter returnType) {
public CompletableFuture<?> toCompletableFuture(Object returnValue, MethodParameter returnType) {
HandlerMethodReturnValueHandler handler = getReturnValueHandler(returnType);
if (handler instanceof AsyncHandlerMethodReturnValueHandler) {
return ((AsyncHandlerMethodReturnValueHandler) handler).toListenableFuture(returnValue, returnType);
return ((AsyncHandlerMethodReturnValueHandler) handler).toCompletableFuture(returnValue, returnType);
}
return null;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,6 +16,8 @@
package org.springframework.messaging.handler.invocation;
import java.util.concurrent.CompletableFuture;
import org.springframework.core.MethodParameter;
import org.springframework.util.concurrent.ListenableFuture;
@ -24,7 +26,9 @@ import org.springframework.util.concurrent.ListenableFuture;
*
* @author Sebastien Deleuze
* @since 4.2
* @deprecated as of 6.0, in favor of {@link CompletableFutureReturnValueHandler}
*/
@Deprecated
public class ListenableFutureReturnValueHandler extends AbstractAsyncReturnValueHandler {
@Override
@ -38,4 +42,8 @@ public class ListenableFutureReturnValueHandler extends AbstractAsyncReturnValue
return (ListenableFuture<?>) returnValue;
}
@Override
public CompletableFuture<?> toCompletableFuture(Object returnValue, MethodParameter returnType) {
return ((ListenableFuture<?>) returnValue).completable();
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,13 +16,13 @@
package org.springframework.messaging.handler.invocation;
import java.util.concurrent.CompletableFuture;
import reactor.core.publisher.Mono;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.MonoToListenableFutureAdapter;
/**
* Support for single-value reactive types (like {@code Mono} or {@code Single})
@ -57,12 +57,11 @@ public class ReactiveReturnValueHandler extends AbstractAsyncReturnValueHandler
}
@Override
public ListenableFuture<?> toListenableFuture(Object returnValue, MethodParameter returnType) {
public CompletableFuture<?> toCompletableFuture(Object returnValue, MethodParameter returnType) {
ReactiveAdapter adapter = this.adapterRegistry.getAdapter(returnType.getParameterType(), returnValue);
if (adapter != null) {
return new MonoToListenableFutureAdapter<>(Mono.from(adapter.toPublisher(returnValue)));
return Mono.from(adapter.toPublisher(returnValue)).toFuture();
}
return null;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -326,6 +326,7 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan
return resolvers;
}
@SuppressWarnings("deprecation")
@Override
protected List<? extends HandlerMethodReturnValueHandler> initReturnValueHandlers() {
List<HandlerMethodReturnValueHandler> handlers = new ArrayList<>();

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,6 +16,9 @@
package org.springframework.messaging.simp.stomp;
import java.util.concurrent.CompletableFuture;
import org.springframework.util.concurrent.CompletableToListenableFutureAdapter;
import org.springframework.util.concurrent.ListenableFuture;
/**
@ -33,7 +36,17 @@ public interface ConnectionHandlingStompSession extends StompSession, StompTcpCo
/**
* Return a future that will complete when the session is ready for use.
* @deprecated as of 6.0, in favor of {@link #getSession()}
*/
ListenableFuture<StompSession> getSessionFuture();
@Deprecated
default ListenableFuture<StompSession> getSessionFuture() {
return new CompletableToListenableFutureAdapter<>(getSession());
}
/**
* Return a future that will complete when the session is ready for use.
* @since 6.0
*/
CompletableFuture<StompSession> getSession();
}

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
@ -47,9 +48,6 @@ import org.springframework.util.Assert;
import org.springframework.util.IdGenerator;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.SettableListenableFuture;
/**
* Default implementation of {@link ConnectionHandlingStompSession}.
@ -85,7 +83,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
private final StompHeaders connectHeaders;
private final SettableListenableFuture<StompSession> sessionFuture = new SettableListenableFuture<>();
private final CompletableFuture<StompSession> sessionFuture = new CompletableFuture<>();
private MessageConverter converter = new SimpleMessageConverter();
@ -149,7 +147,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
}
@Override
public ListenableFuture<StompSession> getSessionFuture() {
public CompletableFuture<StompSession> getSession() {
return this.sessionFuture;
}
@ -289,7 +287,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
TcpConnection<byte[]> conn = this.connection;
Assert.state(conn != null, "Connection closed");
try {
conn.send(message).get();
conn.sendAsync(message).get();
}
catch (ExecutionException ex) {
throw new MessageDeliveryException(message, ex.getCause());
@ -407,7 +405,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
if (logger.isDebugEnabled()) {
logger.debug("Failed to connect session id=" + this.sessionId, ex);
}
this.sessionFuture.setException(ex);
this.sessionFuture.completeExceptionally(ex);
this.sessionHandler.handleTransportError(this, ex);
}
@ -450,7 +448,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
else if (StompCommand.CONNECTED.equals(command)) {
initHeartbeatTasks(headers);
this.version = headers.getFirst("version");
this.sessionFuture.set(this);
this.sessionFuture.complete(this);
this.sessionHandler.afterConnected(this, headers);
}
else if (StompCommand.ERROR.equals(command)) {
@ -506,7 +504,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
@Override
public void handleFailure(Throwable ex) {
try {
this.sessionFuture.setException(ex); // no-op if already set
this.sessionFuture.completeExceptionally(ex); // no-op if already set
this.sessionHandler.handleTransportError(this, ex);
}
catch (Throwable ex2) {
@ -698,16 +696,11 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
public void run() {
TcpConnection<byte[]> conn = connection;
if (conn != null) {
conn.send(HEARTBEAT).addCallback(
new ListenableFutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void result) {
}
@Override
public void onFailure(Throwable ex) {
handleFailure(ex);
}
});
conn.sendAsync(HEARTBEAT).whenComplete((unused, throwable) -> {
if (throwable != null) {
handleFailure(throwable);
}
});
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,11 +16,14 @@
package org.springframework.messaging.simp.stomp;
import java.util.concurrent.CompletableFuture;
import org.springframework.lang.Nullable;
import org.springframework.messaging.simp.SimpLogging;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.CompletableToListenableFutureAdapter;
import org.springframework.util.concurrent.ListenableFuture;
/**
@ -71,9 +74,22 @@ public class ReactorNettyTcpStompClient extends StompClientSupport {
* on the STOMP level.
* @param handler the handler for the STOMP session
* @return a ListenableFuture for access to the session when ready for use
* @deprecated as of 6.0, in favor of {@link #connectAsync(StompSessionHandler)}
*/
@Deprecated
public ListenableFuture<StompSession> connect(StompSessionHandler handler) {
return connect(null, handler);
return new CompletableToListenableFutureAdapter<>(connectAsync(handler));
}
/**
* Connect and notify the given {@link StompSessionHandler} when connected
* on the STOMP level.
* @param handler the handler for the STOMP session
* @return a ListenableFuture for access to the session when ready for use
* @since 6.0
*/
public CompletableFuture<StompSession> connectAsync(StompSessionHandler handler) {
return connectAsync(null, handler);
}
/**
@ -82,18 +98,33 @@ public class ReactorNettyTcpStompClient extends StompClientSupport {
* @param connectHeaders headers to add to the CONNECT frame
* @param handler the handler for the STOMP session
* @return a ListenableFuture for access to the session when ready for use
* @deprecated as of 6.0, in favor of {@link #connectAsync(StompHeaders, StompSessionHandler)}
*/
@Deprecated
public ListenableFuture<StompSession> connect(@Nullable StompHeaders connectHeaders, StompSessionHandler handler) {
ConnectionHandlingStompSession session = createSession(connectHeaders, handler);
this.tcpClient.connect(session);
this.tcpClient.connectAsync(session);
return session.getSessionFuture();
}
/**
* An overloaded version of {@link #connectAsync(StompSessionHandler)} that
* accepts headers to use for the STOMP CONNECT frame.
* @param connectHeaders headers to add to the CONNECT frame
* @param handler the handler for the STOMP session
* @return a CompletableFuture for access to the session when ready for use
*/
public CompletableFuture<StompSession> connectAsync(@Nullable StompHeaders connectHeaders, StompSessionHandler handler) {
ConnectionHandlingStompSession session = createSession(connectHeaders, handler);
this.tcpClient.connectAsync(session);
return session.getSession();
}
/**
* Shut down the client and release resources.
*/
public void shutdown() {
this.tcpClient.shutdown();
this.tcpClient.shutdownAsync();
}
@Override

View File

@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -47,9 +48,6 @@ import org.springframework.messaging.tcp.reactor.ReactorNettyCodec;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.ListenableFutureTask;
/**
* A {@link org.springframework.messaging.MessageHandler} that handles messages by
@ -98,14 +96,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private static final byte[] EMPTY_PAYLOAD = new byte[0];
private static final ListenableFutureTask<Void> EMPTY_TASK = new ListenableFutureTask<>(new VoidCallable());
private static final CompletableFuture<Void> EMPTY_TASK = CompletableFuture.completedFuture(null);
private static final StompHeaderAccessor HEART_BEAT_ACCESSOR;
private static final Message<byte[]> HEARTBEAT_MESSAGE;
static {
EMPTY_TASK.run();
HEART_BEAT_ACCESSOR = StompHeaderAccessor.createForHeartbeat();
HEARTBEAT_MESSAGE = MessageBuilder.createMessage(
StompDecoder.HEARTBEAT_PAYLOAD, HEART_BEAT_ACCESSOR.getMessageHeaders());
@ -455,7 +452,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
this.connectionHandlers.put(handler.getSessionId(), handler);
this.stats.incrementConnectCount();
this.tcpClient.connect(handler, new FixedIntervalReconnectStrategy(5000));
this.tcpClient.connectAsync(handler, new FixedIntervalReconnectStrategy(5000));
if (this.taskScheduler != null) {
this.taskScheduler.scheduleWithFixedDelay(new ClientSendMessageCountTask(), Duration.ofMillis(5000));
@ -478,7 +475,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
publishBrokerUnavailableEvent();
if (this.tcpClient != null) {
try {
this.tcpClient.shutdown().get(5000, TimeUnit.MILLISECONDS);
this.tcpClient.shutdownAsync().get(5000, TimeUnit.MILLISECONDS);
}
catch (Throwable ex) {
logger.error("Error in shutdown of TCP client", ex);
@ -572,7 +569,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
this.connectionHandlers.put(sessionId, handler);
this.stats.incrementConnectCount();
Assert.state(this.tcpClient != null, "No TCP client available");
this.tcpClient.connect(handler);
this.tcpClient.connectAsync(handler);
}
else if (StompCommand.DISCONNECT.equals(command)) {
RelayConnectionHandler handler = this.connectionHandlers.get(sessionId);
@ -691,7 +688,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
MAX_TIME_TO_CONNECTED_FRAME + " ms.", null);
}
}, MAX_TIME_TO_CONNECTED_FRAME);
connection.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, this.connectHeaders.getMessageHeaders()));
connection.sendAsync(MessageBuilder.createMessage(EMPTY_PAYLOAD, this.connectHeaders.getMessageHeaders()));
}
@Override
@ -865,7 +862,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
* @return a future to wait for the result
*/
@SuppressWarnings("unchecked")
public ListenableFuture<Void> forward(final Message<?> message, final StompHeaderAccessor accessor) {
public CompletableFuture<Void> forward(final Message<?> message, final StompHeaderAccessor accessor) {
TcpConnection<byte[]> conn = this.tcpConnection;
if (!this.isStompConnected || conn == null) {
@ -901,19 +898,17 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
logger.trace("Forwarding " + accessor.getDetailedLogMessage(message.getPayload()));
}
ListenableFuture<Void> future = conn.send((Message<byte[]>) messageToSend);
future.addCallback(new ListenableFutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void result) {
CompletableFuture<Void> future = conn.sendAsync((Message<byte[]>) messageToSend);
future.whenComplete((unused, throwable) -> {
if (throwable == null) {
if (accessor.getCommand() == StompCommand.DISCONNECT) {
afterDisconnectSent(accessor);
}
}
@Override
public void onFailure(Throwable ex) {
if (tcpConnection != null) {
else {
if (this.tcpConnection != null) {
handleTcpConnectionFailure("failed to forward " +
accessor.getShortLogMessage(message.getPayload()), ex);
accessor.getShortLogMessage(message.getPayload()), throwable);
}
else if (logger.isErrorEnabled()) {
logger.error("Failed to forward " + accessor.getShortLogMessage(message.getPayload()));
@ -1005,10 +1000,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
if (clientSendInterval > 0 && serverReceiveInterval > 0) {
long interval = Math.max(clientSendInterval, serverReceiveInterval);
con.onWriteInactivity(() ->
con.send(HEARTBEAT_MESSAGE).addCallback(
result -> {},
ex -> handleTcpConnectionFailure(
"Failed to forward heartbeat: " + ex.getMessage(), ex)), interval);
con.sendAsync(HEARTBEAT_MESSAGE).whenComplete((unused, ex) -> {
if (ex != null) {
handleTcpConnectionFailure("Failed to forward heartbeat: " + ex.getMessage(), ex);
}
}), interval);
}
if (clientReceiveInterval > 0 && serverSendInterval > 0) {
final long interval = Math.max(clientReceiveInterval, serverSendInterval) * HEARTBEAT_MULTIPLIER;
@ -1029,12 +1025,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
TcpConnection<byte[]> conn = getTcpConnection();
if (conn != null) {
MessageHeaders headers = accessor.getMessageHeaders();
conn.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers)).addCallback(
result -> {},
ex -> {
String error = "Failed to subscribe in \"system\" session.";
handleTcpConnectionFailure(error, ex);
});
conn.sendAsync(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers)).whenComplete((unused, ex) -> {
if (ex != null) {
handleTcpConnectionFailure("Failed to subscribe in \"system\" session.", ex);
}
});
}
}
}
@ -1083,9 +1078,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
@Override
public ListenableFuture<Void> forward(Message<?> message, StompHeaderAccessor accessor) {
public CompletableFuture<Void> forward(Message<?> message, StompHeaderAccessor accessor) {
try {
ListenableFuture<Void> future = super.forward(message, accessor);
CompletableFuture<Void> future = super.forward(message, accessor);
if (message.getHeaders().get(SimpMessageHeaderAccessor.IGNORE_ERROR) == null) {
future.get();
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,8 +17,10 @@
package org.springframework.messaging.tcp;
import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
import org.springframework.messaging.Message;
import org.springframework.util.concurrent.CompletableToListenableFutureAdapter;
import org.springframework.util.concurrent.ListenableFuture;
/**
@ -35,8 +37,21 @@ public interface TcpConnection<P> extends Closeable {
* @param message the message
* @return a ListenableFuture that can be used to determine when and if the
* message was successfully sent
* @deprecated as of 6.0, in favor of {@link #sendAsync(Message)}
*/
ListenableFuture<Void> send(Message<P> message);
@Deprecated
default ListenableFuture<Void> send(Message<P> message) {
return new CompletableToListenableFutureAdapter<>(sendAsync(message));
}
/**
* Send the given message.
* @param message the message
* @return a CompletableFuture that can be used to determine when and if the
* message was successfully sent
* @since 6.0
*/
CompletableFuture<Void> sendAsync(Message<P> message);
/**
* Register a task to invoke after a period of read inactivity.

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,6 +16,9 @@
package org.springframework.messaging.tcp;
import java.util.concurrent.CompletableFuture;
import org.springframework.util.concurrent.CompletableToListenableFutureAdapter;
import org.springframework.util.concurrent.ListenableFuture;
/**
@ -32,8 +35,21 @@ public interface TcpOperations<P> {
* @param connectionHandler a handler to manage the connection
* @return a ListenableFuture that can be used to determine when and if the
* connection is successfully established
* @deprecated as of 6.0, in favor of {@link #connectAsync(TcpConnectionHandler)}
*/
ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler);
@Deprecated
default ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler) {
return new CompletableToListenableFutureAdapter<>(connectAsync(connectionHandler));
}
/**
* Open a new connection.
* @param connectionHandler a handler to manage the connection
* @return a CompletableFuture that can be used to determine when and if the
* connection is successfully established
* @since 6.0
*/
CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> connectionHandler);
/**
* Open a new connection and a strategy for reconnecting if the connection fails.
@ -41,14 +57,40 @@ public interface TcpOperations<P> {
* @param reconnectStrategy a strategy for reconnecting
* @return a ListenableFuture that can be used to determine when and if the
* initial connection is successfully established
* @deprecated as of 6.0, in favor of {@link #connectAsync(TcpConnectionHandler, ReconnectStrategy)}
*/
ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy reconnectStrategy);
@Deprecated
default ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy reconnectStrategy) {
return new CompletableToListenableFutureAdapter<>(connectAsync(connectionHandler, reconnectStrategy));
}
/**
* Open a new connection and a strategy for reconnecting if the connection fails.
* @param connectionHandler a handler to manage the connection
* @param reconnectStrategy a strategy for reconnecting
* @return a CompletableFuture that can be used to determine when and if the
* initial connection is successfully established
* @since 6.0
*/
CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy reconnectStrategy);
/**
* Shut down and close any open connections.
* @return a ListenableFuture that can be used to determine when and if the
* connection is successfully closed
* @deprecated as of 6.0, in favor of {@link #shutdownAsync()}
*/
ListenableFuture<Void> shutdown();
@Deprecated
default ListenableFuture<Void> shutdown() {
return new CompletableToListenableFutureAdapter<>(shutdownAsync());
}
/**
* Shut down and close any open connections.
* @return a ListenableFuture that can be used to determine when and if the
* connection is successfully closed
* @since 6.0
*/
CompletableFuture<Void> shutdownAsync();
}

View File

@ -52,10 +52,6 @@ import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.CompletableToListenableFutureAdapter;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.MonoToListenableFutureAdapter;
import org.springframework.util.concurrent.SettableListenableFuture;
/**
* Reactor Netty based implementation of {@link TcpOperations}.
@ -179,20 +175,18 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
@Override
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> handler) {
public CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> handler) {
Assert.notNull(handler, "TcpConnectionHandler is required");
if (this.stopping) {
return handleShuttingDownConnectFailure(handler);
}
Mono<Void> connectMono = extendTcpClient(this.tcpClient, handler)
return extendTcpClient(this.tcpClient, handler)
.handle(new ReactorNettyHandler(handler))
.connect()
.doOnError(handler::afterConnectFailure)
.then();
return new MonoToListenableFutureAdapter<>(connectMono);
.then().toFuture();
}
/**
@ -209,7 +203,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
public CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
Assert.notNull(handler, "TcpConnectionHandler is required");
Assert.notNull(strategy, "ReconnectStrategy is required");
@ -234,14 +228,13 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
.scan(1, (count, element) -> count++)
.flatMap(attempt -> reconnect(attempt, strategy)))
.subscribe();
return new CompletableToListenableFutureAdapter<>(connectFuture);
return connectFuture;
}
private ListenableFuture<Void> handleShuttingDownConnectFailure(TcpConnectionHandler<P> handler) {
private CompletableFuture<Void> handleShuttingDownConnectFailure(TcpConnectionHandler<P> handler) {
IllegalStateException ex = new IllegalStateException("Shutting down.");
handler.afterConnectFailure(ex);
return new MonoToListenableFutureAdapter<>(Mono.error(ex));
return Mono.<Void>error(ex).toFuture();
}
private Publisher<? extends Long> reconnect(Integer attempt, ReconnectStrategy reconnectStrategy) {
@ -250,11 +243,9 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
}
@Override
public ListenableFuture<Void> shutdown() {
public CompletableFuture<Void> shutdownAsync() {
if (this.stopping) {
SettableListenableFuture<Void> future = new SettableListenableFuture<>();
future.set(null);
return future;
return CompletableFuture.completedFuture(null);
}
this.stopping = true;
@ -274,7 +265,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
result = stopScheduler();
}
return new MonoToListenableFutureAdapter<>(result);
return result.toFuture();
}
private Mono<Void> stopScheduler() {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,6 +16,8 @@
package org.springframework.messaging.tcp.reactor;
import java.util.concurrent.CompletableFuture;
import io.netty.buffer.ByteBuf;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
@ -24,8 +26,6 @@ import reactor.netty.NettyOutbound;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.MonoToListenableFutureAdapter;
/**
* Reactor Netty based implementation of {@link TcpConnection}.
@ -56,11 +56,12 @@ public class ReactorNettyTcpConnection<P> implements TcpConnection<P> {
@Override
public ListenableFuture<Void> send(Message<P> message) {
public CompletableFuture<Void> sendAsync(Message<P> message) {
ByteBuf byteBuf = this.outbound.alloc().buffer();
this.codec.encode(message, byteBuf);
Mono<Void> sendCompletion = this.outbound.send(Mono.just(byteBuf)).then();
return new MonoToListenableFutureAdapter<>(sendCompletion);
return this.outbound.send(Mono.just(byteBuf))
.then()
.toFuture();
}
@Override

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -535,6 +535,7 @@ public class SimpAnnotationMethodMessageHandlerTests {
@Controller
@MessageMapping("listenable-future")
@SuppressWarnings("deprecation")
private static class ListenableFutureController {
private ListenableFutureTask<String> future;

View File

@ -20,6 +20,7 @@ import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicReference;
@ -45,7 +46,6 @@ import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.concurrent.SettableListenableFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@ -90,9 +90,9 @@ public class DefaultStompSessionTests {
new CompositeMessageConverter(
Arrays.asList(new StringMessageConverter(), new ByteArrayMessageConverter())));
SettableListenableFuture<Void> future = new SettableListenableFuture<>();
future.set(null);
given(this.connection.send(this.messageCaptor.capture())).willReturn(future);
CompletableFuture<Void> future = new CompletableFuture<>();
future.complete(null);
given(this.connection.sendAsync(this.messageCaptor.capture())).willReturn(future);
}
@ -177,7 +177,7 @@ public class DefaultStompSessionTests {
@Test
public void heartbeatNotSupportedByServer() {
this.session.afterConnected(this.connection);
verify(this.connection).send(any());
verify(this.connection).sendAsync(any());
this.connectHeaders.setHeartbeat(new long[] {10000, 10000});
@ -193,7 +193,7 @@ public class DefaultStompSessionTests {
@Test
public void heartbeatTasks() {
this.session.afterConnected(this.connection);
verify(this.connection).send(any());
verify(this.connection).sendAsync(any());
this.connectHeaders.setHeartbeat(new long[] {10000, 10000});
@ -216,7 +216,7 @@ public class DefaultStompSessionTests {
writeTask.run();
StompHeaderAccessor accessor = StompHeaderAccessor.createForHeartbeat();
Message<byte[]> message = MessageBuilder.createMessage(new byte[] {'\n'}, accessor.getMessageHeaders());
verify(this.connection).send(eq(message));
verify(this.connection).sendAsync(eq(message));
verifyNoMoreInteractions(this.connection);
reset(this.sessionHandler);
@ -435,10 +435,9 @@ public class DefaultStompSessionTests {
assertThat(this.session.isConnected()).isTrue();
IllegalStateException exception = new IllegalStateException("simulated exception");
SettableListenableFuture<Void> future = new SettableListenableFuture<>();
future.setException(exception);
CompletableFuture<Void> future = CompletableFuture.failedFuture(exception);
given(this.connection.send(any())).willReturn(future);
given(this.connection.sendAsync(any())).willReturn(future);
assertThatExceptionOfType(MessageDeliveryException.class).isThrownBy(() ->
this.session.send("/topic/foo", "sample payload".getBytes(StandardCharsets.UTF_8)))
.withCause(exception);

View File

@ -21,6 +21,7 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -38,7 +39,6 @@ import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.messaging.simp.stomp.StompSession.Subscription;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import static org.assertj.core.api.Assertions.assertThat;
@ -107,10 +107,10 @@ public class ReactorNettyTcpStompClientTests {
public void publishSubscribe() throws Exception {
String destination = "/topic/foo";
ConsumingHandler consumingHandler1 = new ConsumingHandler(destination);
ListenableFuture<StompSession> consumerFuture1 = this.client.connect(consumingHandler1);
CompletableFuture<StompSession> consumerFuture1 = this.client.connectAsync(consumingHandler1);
ConsumingHandler consumingHandler2 = new ConsumingHandler(destination);
ListenableFuture<StompSession> consumerFuture2 = this.client.connect(consumingHandler2);
CompletableFuture<StompSession> consumerFuture2 = this.client.connectAsync(consumingHandler2);
assertThat(consumingHandler1.awaitForSubscriptions(5000)).isTrue();
assertThat(consumingHandler2.awaitForSubscriptions(5000)).isTrue();
@ -118,7 +118,7 @@ public class ReactorNettyTcpStompClientTests {
ProducingHandler producingHandler = new ProducingHandler();
producingHandler.addToSend(destination, "foo1");
producingHandler.addToSend(destination, "foo2");
ListenableFuture<StompSession> producerFuture = this.client.connect(producingHandler);
CompletableFuture<StompSession> producerFuture = this.client.connectAsync(producingHandler);
assertThat(consumingHandler1.awaitForMessageCount(2, 5000)).isTrue();
assertThat(consumingHandler1.getReceived()).containsExactly("foo1", "foo2");

View File

@ -20,6 +20,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -39,8 +40,6 @@ import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureTask;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
@ -309,10 +308,8 @@ class StompBrokerRelayMessageHandlerTests {
}
private static ListenableFutureTask<Void> getVoidFuture() {
ListenableFutureTask<Void> futureTask = new ListenableFutureTask<>(() -> null);
futureTask.run();
return futureTask;
private static CompletableFuture<Void> getVoidFuture() {
return CompletableFuture.completedFuture(null);
}
@ -336,21 +333,22 @@ class StompBrokerRelayMessageHandlerTests {
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<byte[]> handler) {
public CompletableFuture<Void> connectAsync(TcpConnectionHandler<byte[]> handler) {
this.connectionHandler = handler;
handler.afterConnected(this.connection);
return getVoidFuture();
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<byte[]> handler, ReconnectStrategy strategy) {
public CompletableFuture<Void> connectAsync(TcpConnectionHandler<byte[]> handler,
ReconnectStrategy reconnectStrategy) {
this.connectionHandler = handler;
handler.afterConnected(this.connection);
return getVoidFuture();
}
@Override
public ListenableFuture<Void> shutdown() {
public CompletableFuture<Void> shutdownAsync() {
return getVoidFuture();
}
@ -371,7 +369,7 @@ class StompBrokerRelayMessageHandlerTests {
}
@Override
public ListenableFuture<Void> send(Message<byte[]> message) {
public CompletableFuture<Void> sendAsync(Message<byte[]> message) {
this.messages.add(message);
return getVoidFuture();
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -148,7 +148,7 @@ final class DefaultAsyncServerResponse extends ErrorHandlingServerResponse imple
else {
result = new DeferredResult<>();
}
this.futureResponse.handle((value, ex) -> {
this.futureResponse.whenComplete((value, ex) -> {
if (ex != null) {
if (ex instanceof CompletionException && ex.getCause() != null) {
ex = ex.getCause();
@ -164,7 +164,6 @@ final class DefaultAsyncServerResponse extends ErrorHandlingServerResponse imple
else {
result.setResult(value);
}
return null;
});
return result;
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -366,7 +366,7 @@ final class DefaultEntityResponseBuilder<T> implements EntityResponse.Builder<T>
Context context) {
DeferredResult<ServerResponse> result = new DeferredResult<>();
entity().handle((value, ex) -> {
entity().whenComplete((value, ex) -> {
if (ex != null) {
if (ex instanceof CompletionException && ex.getCause() != null) {
ex = ex.getCause();
@ -388,7 +388,6 @@ final class DefaultEntityResponseBuilder<T> implements EntityResponse.Builder<T>
result.setErrorResult(writeException);
}
}
return null;
});
return result;
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,7 +18,6 @@ package org.springframework.web.servlet.mvc.method.annotation;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import org.springframework.core.MethodParameter;
import org.springframework.lang.Nullable;
@ -39,6 +38,7 @@ import org.springframework.web.method.support.ModelAndViewContainer;
*/
public class DeferredResultMethodReturnValueHandler implements HandlerMethodReturnValueHandler {
@SuppressWarnings("deprecation")
@Override
public boolean supportsReturnType(MethodParameter returnType) {
Class<?> type = returnType.getParameterType();
@ -47,6 +47,7 @@ public class DeferredResultMethodReturnValueHandler implements HandlerMethodRetu
CompletionStage.class.isAssignableFrom(type));
}
@SuppressWarnings("deprecation")
@Override
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
@ -75,6 +76,7 @@ public class DeferredResultMethodReturnValueHandler implements HandlerMethodRetu
WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer);
}
@SuppressWarnings("deprecation")
private DeferredResult<Object> adaptListenableFuture(ListenableFuture<?> future) {
DeferredResult<Object> result = new DeferredResult<>();
future.addCallback(new ListenableFutureCallback<Object>() {
@ -92,7 +94,7 @@ public class DeferredResultMethodReturnValueHandler implements HandlerMethodRetu
private DeferredResult<Object> adaptCompletionStage(CompletionStage<?> future) {
DeferredResult<Object> result = new DeferredResult<>();
future.handle((BiFunction<Object, Throwable, Object>) (value, ex) -> {
future.whenComplete((value, ex) -> {
if (ex != null) {
if (ex instanceof CompletionException && ex.getCause() != null) {
ex = ex.getCause();
@ -102,7 +104,6 @@ public class DeferredResultMethodReturnValueHandler implements HandlerMethodRetu
else {
result.setResult(value);
}
return null;
});
return result;
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -89,6 +89,7 @@ public class DeferredResultReturnValueHandlerTests {
}
@Test
@SuppressWarnings("deprecation")
public void listenableFuture() throws Exception {
SettableListenableFuture<String> future = new SettableListenableFuture<>();
testHandle(future, ListenableFuture.class, () -> future.set("foo"), "foo");
@ -107,6 +108,7 @@ public class DeferredResultReturnValueHandlerTests {
}
@Test
@SuppressWarnings("deprecation")
public void listenableFutureWithError() throws Exception {
SettableListenableFuture<String> future = new SettableListenableFuture<>();
IllegalStateException ex = new IllegalStateException();

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -63,16 +64,16 @@ public abstract class AbstractWebSocketClient implements WebSocketClient {
@Override
public ListenableFuture<WebSocketSession> doHandshake(WebSocketHandler webSocketHandler,
public CompletableFuture<WebSocketSession> execute(WebSocketHandler webSocketHandler,
String uriTemplate, Object... uriVars) {
Assert.notNull(uriTemplate, "'uriTemplate' must not be null");
URI uri = UriComponentsBuilder.fromUriString(uriTemplate).buildAndExpand(uriVars).encode().toUri();
return doHandshake(webSocketHandler, null, uri);
return execute(webSocketHandler, null, uri);
}
@Override
public final ListenableFuture<WebSocketSession> doHandshake(WebSocketHandler webSocketHandler,
public final CompletableFuture<WebSocketSession> execute(WebSocketHandler webSocketHandler,
@Nullable WebSocketHttpHeaders headers, URI uri) {
Assert.notNull(webSocketHandler, "WebSocketHandler must not be null");
@ -96,7 +97,7 @@ public abstract class AbstractWebSocketClient implements WebSocketClient {
List<WebSocketExtension> extensions =
(headers != null ? headers.getSecWebSocketExtensions() : Collections.emptyList());
return doHandshakeInternal(webSocketHandler, headersToUse, uri, subProtocols, extensions,
return executeInternal(webSocketHandler, headersToUse, uri, subProtocols, extensions,
Collections.emptyMap());
}
@ -119,8 +120,28 @@ public abstract class AbstractWebSocketClient implements WebSocketClient {
* @param attributes the attributes to associate with the WebSocketSession, i.e. via
* {@link WebSocketSession#getAttributes()}; currently always an empty map.
* @return the established WebSocket session wrapped in a ListenableFuture.
* @deprecated as of 6.0, in favor of {@link #executeInternal(WebSocketHandler, HttpHeaders, URI, List, List, Map)}
*/
protected abstract ListenableFuture<WebSocketSession> doHandshakeInternal(WebSocketHandler webSocketHandler,
@Deprecated
protected ListenableFuture<WebSocketSession> doHandshakeInternal(WebSocketHandler webSocketHandler,
HttpHeaders headers, URI uri, List<String> subProtocols, List<WebSocketExtension> extensions,
Map<String, Object> attributes) {
throw new UnsupportedOperationException("doHandshakeInternal is deprecated in favor of executeInternal");
}
/**
* Perform the actual handshake to establish a connection to the server.
* @param webSocketHandler the client-side handler for WebSocket messages
* @param headers the HTTP headers to use for the handshake, with unwanted (forbidden)
* headers filtered out (never {@code null})
* @param uri the target URI for the handshake (never {@code null})
* @param subProtocols requested sub-protocols, or an empty list
* @param extensions requested WebSocket extensions, or an empty list
* @param attributes the attributes to associate with the WebSocketSession, i.e. via
* {@link WebSocketSession#getAttributes()}; currently always an empty map.
* @return the established WebSocket session wrapped in a ListenableFuture.
*/
protected abstract CompletableFuture<WebSocketSession> executeInternal(WebSocketHandler webSocketHandler,
HttpHeaders headers, URI uri, List<String> subProtocols, List<WebSocketExtension> extensions,
Map<String, Object> attributes);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,8 +17,10 @@
package org.springframework.web.socket.client;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import org.springframework.lang.Nullable;
import org.springframework.util.concurrent.CompletableToListenableFutureAdapter;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketHttpHeaders;
@ -35,10 +37,56 @@ import org.springframework.web.socket.WebSocketSession;
*/
public interface WebSocketClient {
ListenableFuture<WebSocketSession> doHandshake(WebSocketHandler webSocketHandler,
String uriTemplate, Object... uriVariables);
/**
* Execute a handshake request to the given url and handle the resulting
* WebSocket session with the given handler.
* @param webSocketHandler the session handler
* @param uriTemplate the url template
* @param uriVariables the variables to expand the template
* @return a future that completes when the session is available
* @deprecated as of 6.0, in favor of {@link #execute(WebSocketHandler, String, Object...)}
*/
@Deprecated
default ListenableFuture<WebSocketSession> doHandshake(WebSocketHandler webSocketHandler,
String uriTemplate, Object... uriVariables) {
return new CompletableToListenableFutureAdapter<>(execute(webSocketHandler, uriTemplate, uriVariables));
}
ListenableFuture<WebSocketSession> doHandshake(WebSocketHandler webSocketHandler,
/**
* Execute a handshake request to the given url and handle the resulting
* WebSocket session with the given handler.
* @param webSocketHandler the session handler
* @param uriTemplate the url template
* @param uriVariables the variables to expand the template
* @return a future that completes when the session is available
* @since 6.0
*/
CompletableFuture<WebSocketSession> execute(WebSocketHandler webSocketHandler,
String uriTemplate, Object... uriVariables);
/**
* Execute a handshake request to the given url and handle the resulting
* WebSocket session with the given handler.
* @param webSocketHandler the session handler
* @param uri the url
* @return a future that completes when the session is available
* @deprecated as of 6.0, in favor of {@link #execute(WebSocketHandler, WebSocketHttpHeaders, URI)}
*/
@Deprecated
default ListenableFuture<WebSocketSession> doHandshake(WebSocketHandler webSocketHandler,
@Nullable WebSocketHttpHeaders headers, URI uri) {
return new CompletableToListenableFutureAdapter<>(execute(webSocketHandler, headers, uri));
}
/**
* Execute a handshake request to the given url and handle the resulting
* WebSocket session with the given handler.
* @param webSocketHandler the session handler
* @param uri the url
* @return a future that completes when the session is available
* @since 6.0
*/
CompletableFuture<WebSocketSession> execute(WebSocketHandler webSocketHandler,
@Nullable WebSocketHttpHeaders headers, URI uri);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,12 +17,11 @@
package org.springframework.web.socket.client;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.springframework.context.Lifecycle;
import org.springframework.http.HttpHeaders;
import org.springframework.lang.Nullable;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.WebSocketSession;
@ -137,17 +136,15 @@ public class WebSocketConnectionManager extends ConnectionManagerSupport {
logger.info("Connecting to WebSocket at " + getUri());
}
ListenableFuture<WebSocketSession> future =
this.client.doHandshake(this.webSocketHandler, this.headers, getUri());
CompletableFuture<WebSocketSession> future =
this.client.execute(this.webSocketHandler, this.headers, getUri());
future.addCallback(new ListenableFutureCallback<WebSocketSession>() {
@Override
public void onSuccess(@Nullable WebSocketSession result) {
webSocketSession = result;
future.whenComplete((result, ex) -> {
if (result != null) {
this.webSocketSession = result;
logger.info("Successfully connected");
}
@Override
public void onFailure(Throwable ex) {
else if (ex != null) {
logger.error("Failed to connect", ex);
}
});

View File

@ -21,6 +21,7 @@ import java.security.Principal;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -30,12 +31,12 @@ import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.springframework.context.Lifecycle;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.http.HttpHeaders;
import org.springframework.lang.Nullable;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureTask;
import org.springframework.util.concurrent.FutureUtils;
import org.springframework.web.socket.WebSocketExtension;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
@ -43,8 +44,6 @@ import org.springframework.web.socket.adapter.jetty.JettyWebSocketHandlerAdapter
import org.springframework.web.socket.adapter.jetty.JettyWebSocketSession;
import org.springframework.web.socket.adapter.jetty.WebSocketToJettyExtensionConfigAdapter;
import org.springframework.web.socket.client.AbstractWebSocketClient;
import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;
/**
* Initiates WebSocket requests to a WebSocket server programmatically
@ -64,7 +63,7 @@ public class JettyWebSocketClient extends AbstractWebSocketClient implements Lif
private final org.eclipse.jetty.websocket.client.WebSocketClient client;
@Nullable
private AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
/**
@ -90,7 +89,7 @@ public class JettyWebSocketClient extends AbstractWebSocketClient implements Lif
* {@code doHandshake} methods will block until the connection is established.
* <p>By default an instance of {@code SimpleAsyncTaskExecutor} is used.
*/
public void setTaskExecutor(@Nullable AsyncListenableTaskExecutor taskExecutor) {
public void setTaskExecutor(@Nullable AsyncTaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
@ -98,7 +97,7 @@ public class JettyWebSocketClient extends AbstractWebSocketClient implements Lif
* Return the configured {@link TaskExecutor}.
*/
@Nullable
public AsyncListenableTaskExecutor getTaskExecutor() {
public AsyncTaskExecutor getTaskExecutor() {
return this.taskExecutor;
}
@ -130,15 +129,7 @@ public class JettyWebSocketClient extends AbstractWebSocketClient implements Lif
@Override
public ListenableFuture<WebSocketSession> doHandshake(WebSocketHandler webSocketHandler,
String uriTemplate, Object... uriVars) {
UriComponents uriComponents = UriComponentsBuilder.fromUriString(uriTemplate).buildAndExpand(uriVars).encode();
return doHandshake(webSocketHandler, null, uriComponents.toUri());
}
@Override
public ListenableFuture<WebSocketSession> doHandshakeInternal(WebSocketHandler wsHandler,
public CompletableFuture<WebSocketSession> executeInternal(WebSocketHandler wsHandler,
HttpHeaders headers, final URI uri, List<String> protocols,
List<WebSocketExtension> extensions, Map<String, Object> attributes) {
@ -162,12 +153,10 @@ public class JettyWebSocketClient extends AbstractWebSocketClient implements Lif
};
if (this.taskExecutor != null) {
return this.taskExecutor.submitListenable(connectTask);
return FutureUtils.callAsync(connectTask, this.taskExecutor);
}
else {
ListenableFutureTask<WebSocketSession> task = new ListenableFutureTask<>(connectTask);
task.run();
return task;
return FutureUtils.callAsync(connectTask);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -26,6 +26,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import jakarta.websocket.ClientEndpointConfig;
import jakarta.websocket.ClientEndpointConfig.Configurator;
@ -36,13 +37,13 @@ import jakarta.websocket.HandshakeResponse;
import jakarta.websocket.WebSocketContainer;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.http.HttpHeaders;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureTask;
import org.springframework.util.concurrent.FutureUtils;
import org.springframework.web.socket.WebSocketExtension;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
@ -64,7 +65,7 @@ public class StandardWebSocketClient extends AbstractWebSocketClient {
private final Map<String,Object> userProperties = new HashMap<>();
@Nullable
private AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
/**
@ -113,7 +114,7 @@ public class StandardWebSocketClient extends AbstractWebSocketClient {
* {@code doHandshake} methods will block until the connection is established.
* <p>By default, an instance of {@code SimpleAsyncTaskExecutor} is used.
*/
public void setTaskExecutor(@Nullable AsyncListenableTaskExecutor taskExecutor) {
public void setTaskExecutor(@Nullable AsyncTaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
@ -121,13 +122,13 @@ public class StandardWebSocketClient extends AbstractWebSocketClient {
* Return the configured {@link TaskExecutor}.
*/
@Nullable
public AsyncListenableTaskExecutor getTaskExecutor() {
public AsyncTaskExecutor getTaskExecutor() {
return this.taskExecutor;
}
@Override
protected ListenableFuture<WebSocketSession> doHandshakeInternal(WebSocketHandler webSocketHandler,
protected CompletableFuture<WebSocketSession> executeInternal(WebSocketHandler webSocketHandler,
HttpHeaders headers, final URI uri, List<String> protocols,
List<WebSocketExtension> extensions, Map<String, Object> attributes) {
@ -153,12 +154,10 @@ public class StandardWebSocketClient extends AbstractWebSocketClient {
};
if (this.taskExecutor != null) {
return this.taskExecutor.submitListenable(connectTask);
return FutureUtils.callAsync(connectTask, this.taskExecutor);
}
else {
ListenableFutureTask<WebSocketSession> task = new ListenableFutureTask<>(connectTask);
task.run();
return task;
return FutureUtils.callAsync(connectTask);
}
}

View File

@ -23,7 +23,9 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.function.BiConsumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -47,9 +49,8 @@ import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.concurrent.CompletableToListenableFutureAdapter;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.SettableListenableFuture;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
@ -210,9 +211,25 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
* @param handler the session handler
* @param uriVars the URI variables to expand into the URL
* @return a ListenableFuture for access to the session when ready for use
* @deprecated as of 6.0, in favor of {@link #connectAsync(String, StompSessionHandler, Object...)}
*/
@Deprecated
public ListenableFuture<StompSession> connect(String url, StompSessionHandler handler, Object... uriVars) {
return connect(url, null, handler, uriVars);
return new CompletableToListenableFutureAdapter<>(connectAsync(url, handler, uriVars));
}
/**
* Connect to the given WebSocket URL and notify the given
* {@link org.springframework.messaging.simp.stomp.StompSessionHandler}
* when connected on the STOMP level after the CONNECTED frame is received.
* @param url the url to connect to
* @param handler the session handler
* @param uriVars the URI variables to expand into the URL
* @return a CompletableFuture for access to the session when ready for use
* @since 6.0
*/
public CompletableFuture<StompSession> connectAsync(String url, StompSessionHandler handler, Object... uriVars) {
return connectAsync(url, null, handler, uriVars);
}
/**
@ -224,11 +241,31 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
* @param handler the session handler
* @param uriVariables the URI variables to expand into the URL
* @return a ListenableFuture for access to the session when ready for use
* @deprecated as of 6.0, in favor of {@link #connectAsync(String, WebSocketHttpHeaders, StompSessionHandler, Object...)}
*/
@Deprecated
public ListenableFuture<StompSession> connect(String url, @Nullable WebSocketHttpHeaders handshakeHeaders,
StompSessionHandler handler, Object... uriVariables) {
return connect(url, handshakeHeaders, null, handler, uriVariables);
return new CompletableToListenableFutureAdapter<>(
connectAsync(url, handshakeHeaders, null, handler, uriVariables));
}
/**
* An overloaded version of
* {@link #connect(String, StompSessionHandler, Object...)} that also
* accepts {@link WebSocketHttpHeaders} to use for the WebSocket handshake.
* @param url the url to connect to
* @param handshakeHeaders the headers for the WebSocket handshake
* @param handler the session handler
* @param uriVariables the URI variables to expand into the URL
* @return a ListenableFuture for access to the session when ready for use
* @since 6.0
*/
public CompletableFuture<StompSession> connectAsync(String url, @Nullable WebSocketHttpHeaders handshakeHeaders,
StompSessionHandler handler, Object... uriVariables) {
return connectAsync(url, handshakeHeaders, null, handler, uriVariables);
}
/**
@ -242,13 +279,35 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
* @param handler the session handler
* @param uriVariables the URI variables to expand into the URL
* @return a ListenableFuture for access to the session when ready for use
* @deprecated as of 6.0, in favor of {@link #connectAsync(String, WebSocketHttpHeaders, StompHeaders, StompSessionHandler, Object...)}
*/
@Deprecated
public ListenableFuture<StompSession> connect(String url, @Nullable WebSocketHttpHeaders handshakeHeaders,
@Nullable StompHeaders connectHeaders, StompSessionHandler handler, Object... uriVariables) {
return new CompletableToListenableFutureAdapter<>(
connectAsync(url, handshakeHeaders, connectHeaders, handler, uriVariables));
}
/**
* An overloaded version of
* {@link #connect(String, StompSessionHandler, Object...)} that also accepts
* {@link WebSocketHttpHeaders} to use for the WebSocket handshake and
* {@link StompHeaders} for the STOMP CONNECT frame.
* @param url the url to connect to
* @param handshakeHeaders headers for the WebSocket handshake
* @param connectHeaders headers for the STOMP CONNECT frame
* @param handler the session handler
* @param uriVariables the URI variables to expand into the URL
* @return a CompletableFuture for access to the session when ready for use
* @since 6.0
*/
public CompletableFuture<StompSession> connectAsync(String url, @Nullable WebSocketHttpHeaders handshakeHeaders,
@Nullable StompHeaders connectHeaders, StompSessionHandler handler, Object... uriVariables) {
Assert.notNull(url, "'url' must not be null");
URI uri = UriComponentsBuilder.fromUriString(url).buildAndExpand(uriVariables).encode().toUri();
return connect(uri, handshakeHeaders, connectHeaders, handler);
return connectAsync(uri, handshakeHeaders, connectHeaders, handler);
}
/**
@ -260,17 +319,37 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
* @param connectHeaders headers for the STOMP CONNECT frame
* @param sessionHandler the STOMP session handler
* @return a ListenableFuture for access to the session when ready for use
* @deprecated as of 6.0, in favor of {@link #connectAsync(URI, WebSocketHttpHeaders, StompHeaders, StompSessionHandler)}
*/
@Deprecated
public ListenableFuture<StompSession> connect(URI url, @Nullable WebSocketHttpHeaders handshakeHeaders,
@Nullable StompHeaders connectHeaders, StompSessionHandler sessionHandler) {
return new CompletableToListenableFutureAdapter<>(
connectAsync(url, handshakeHeaders, connectHeaders, sessionHandler));
}
/**
* An overloaded version of
* {@link #connect(String, WebSocketHttpHeaders, StompSessionHandler, Object...)}
* that accepts a fully prepared {@link java.net.URI}.
* @param url the url to connect to
* @param handshakeHeaders the headers for the WebSocket handshake
* @param connectHeaders headers for the STOMP CONNECT frame
* @param sessionHandler the STOMP session handler
* @return a CompletableFuture for access to the session when ready for use
* @since 6.0
*/
public CompletableFuture<StompSession> connectAsync(URI url, @Nullable WebSocketHttpHeaders handshakeHeaders,
@Nullable StompHeaders connectHeaders, StompSessionHandler sessionHandler) {
Assert.notNull(url, "'url' must not be null");
ConnectionHandlingStompSession session = createSession(connectHeaders, sessionHandler);
WebSocketTcpConnectionHandlerAdapter adapter = new WebSocketTcpConnectionHandlerAdapter(session);
getWebSocketClient()
.doHandshake(new LoggingWebSocketHandlerDecorator(adapter), handshakeHeaders, url)
.addCallback(adapter);
return session.getSessionFuture();
.execute(new LoggingWebSocketHandlerDecorator(adapter), handshakeHeaders, url)
.whenComplete(adapter);
return session.getSession();
}
@Override
@ -286,7 +365,7 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
/**
* Adapt WebSocket to the TcpConnectionHandler and TcpConnection contracts.
*/
private class WebSocketTcpConnectionHandlerAdapter implements ListenableFutureCallback<WebSocketSession>,
private class WebSocketTcpConnectionHandlerAdapter implements BiConsumer<WebSocketSession, Throwable>,
WebSocketHandler, TcpConnection<byte[]> {
private final TcpConnectionHandler<byte[]> connectionHandler;
@ -307,15 +386,13 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
this.connectionHandler = connectionHandler;
}
// ListenableFutureCallback implementation: handshake outcome
// CompletableFuture callback implementation: handshake outcome
@Override
public void onSuccess(@Nullable WebSocketSession webSocketSession) {
}
@Override
public void onFailure(Throwable ex) {
this.connectionHandler.afterConnectFailure(ex);
public void accept(@Nullable WebSocketSession webSocketSession, @Nullable Throwable throwable) {
if (throwable != null) {
this.connectionHandler.afterConnectFailure(throwable);
}
}
// WebSocketHandler implementation
@ -375,17 +452,17 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
// TcpConnection implementation
@Override
public ListenableFuture<Void> send(Message<byte[]> message) {
public CompletableFuture<Void> sendAsync(Message<byte[]> message) {
updateLastWriteTime();
SettableListenableFuture<Void> future = new SettableListenableFuture<>();
CompletableFuture<Void> future = new CompletableFuture<>();
try {
WebSocketSession session = this.session;
Assert.state(session != null, "No WebSocketSession available");
session.sendMessage(this.codec.encode(message, session.getClass()));
future.set(null);
future.complete(null);
}
catch (Throwable ex) {
future.setException(ex);
future.completeExceptionally(ex);
}
finally {
updateLastWriteTime();

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -20,6 +20,7 @@ import java.io.IOException;
import java.net.URI;
import java.security.Principal;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
@ -55,7 +56,7 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
private final WebSocketHandler webSocketHandler;
private final SettableListenableFuture<WebSocketSession> connectFuture;
private final CompletableFuture<WebSocketSession> connectFuture;
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
@ -65,9 +66,18 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
@Nullable
private volatile CloseStatus closeStatus;
/**
* Create a new {@code AbstractClientSockJsSession}.
* @deprecated as of 6.0, in favor of {@link #AbstractClientSockJsSession(TransportRequest, WebSocketHandler, CompletableFuture)}
*/
@Deprecated
protected AbstractClientSockJsSession(TransportRequest request, WebSocketHandler handler,
SettableListenableFuture<WebSocketSession> connectFuture) {
this(request, handler, connectFuture.completable());
}
protected AbstractClientSockJsSession(TransportRequest request, WebSocketHandler handler,
CompletableFuture<WebSocketSession> connectFuture) {
Assert.notNull(request, "'request' is required");
Assert.notNull(handler, "'handler' is required");
@ -242,7 +252,7 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
this.state = State.OPEN;
try {
this.webSocketHandler.afterConnectionEstablished(this);
this.connectFuture.set(this);
this.connectFuture.complete(this);
}
catch (Exception ex) {
if (logger.isErrorEnabled()) {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -20,6 +20,7 @@ import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -28,7 +29,6 @@ import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.lang.Nullable;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;
import org.springframework.web.client.HttpServerErrorException;
import org.springframework.web.socket.TextMessage;
@ -91,8 +91,8 @@ public abstract class AbstractXhrTransport implements XhrTransport {
// Transport methods
@Override
public ListenableFuture<WebSocketSession> connect(TransportRequest request, WebSocketHandler handler) {
SettableListenableFuture<WebSocketSession> connectFuture = new SettableListenableFuture<>();
public CompletableFuture<WebSocketSession> connectAsync(TransportRequest request, WebSocketHandler handler) {
CompletableFuture<WebSocketSession> connectFuture = new CompletableFuture<>();
XhrClientSockJsSession session = new XhrClientSockJsSession(request, handler, this, connectFuture);
request.addTimeoutTask(session.getTimeoutTask());
@ -109,9 +109,16 @@ public abstract class AbstractXhrTransport implements XhrTransport {
return connectFuture;
}
@Deprecated
protected void connectInternal(TransportRequest request, WebSocketHandler handler,
URI receiveUrl, HttpHeaders handshakeHeaders, XhrClientSockJsSession session,
SettableListenableFuture<WebSocketSession> connectFuture) {
throw new UnsupportedOperationException("connectInternal has been deprecated in favor of connectInternal");
}
protected abstract void connectInternal(TransportRequest request, WebSocketHandler handler,
URI receiveUrl, HttpHeaders handshakeHeaders, XhrClientSockJsSession session,
SettableListenableFuture<WebSocketSession> connectFuture);
CompletableFuture<WebSocketSession> connectFuture);
// InfoReceiver methods

View File

@ -22,7 +22,9 @@ import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -146,17 +148,40 @@ class DefaultTransportRequest implements TransportRequest {
}
@Deprecated
public void connect(WebSocketHandler handler, SettableListenableFuture<WebSocketSession> future) {
if (logger.isTraceEnabled()) {
logger.trace("Starting " + this);
}
ConnectCallback connectCallback = new ConnectCallback(handler, future);
ListenableConnectCallback connectCallback = new ListenableConnectCallback(handler, future);
scheduleConnectTimeoutTask(connectCallback);
this.transport.connect(this, handler).addCallback(connectCallback);
}
public void connect(WebSocketHandler handler, CompletableFuture<WebSocketSession> future) {
if (logger.isTraceEnabled()) {
logger.trace("Starting " + this);
}
CompletableConnectCallback connectCallback = new CompletableConnectCallback(handler, future);
scheduleConnectTimeoutTask(connectCallback);
this.transport.connectAsync(this, handler).whenComplete(connectCallback);
}
private void scheduleConnectTimeoutTask(ConnectCallback connectHandler) {
private void scheduleConnectTimeoutTask(ListenableConnectCallback connectHandler) {
if (this.timeoutScheduler != null) {
if (logger.isTraceEnabled()) {
logger.trace("Scheduling connect to time out after " + this.timeoutValue + " ms.");
}
Instant timeoutDate = Instant.now().plus(this.timeoutValue, ChronoUnit.MILLIS);
this.timeoutScheduler.schedule(connectHandler, timeoutDate);
}
else if (logger.isTraceEnabled()) {
logger.trace("Connect timeout task not scheduled (no TaskScheduler configured).");
}
}
private void scheduleConnectTimeoutTask(CompletableConnectCallback connectHandler) {
if (this.timeoutScheduler != null) {
if (logger.isTraceEnabled()) {
logger.trace("Scheduling connect to time out after " + this.timeoutValue + " ms.");
@ -182,7 +207,8 @@ class DefaultTransportRequest implements TransportRequest {
* to connect. Also implements {@code Runnable} to handle a scheduled timeout
* callback.
*/
private class ConnectCallback implements ListenableFutureCallback<WebSocketSession>, Runnable {
@SuppressWarnings("deprecation")
private class ListenableConnectCallback implements ListenableFutureCallback<WebSocketSession>, Runnable {
private final WebSocketHandler handler;
@ -190,7 +216,7 @@ class DefaultTransportRequest implements TransportRequest {
private final AtomicBoolean handled = new AtomicBoolean();
public ConnectCallback(WebSocketHandler handler, SettableListenableFuture<WebSocketSession> future) {
public ListenableConnectCallback(WebSocketHandler handler, SettableListenableFuture<WebSocketSession> future) {
this.handler = handler;
this.future = future;
}
@ -250,4 +276,79 @@ class DefaultTransportRequest implements TransportRequest {
}
}
/**
* Updates the given (global) future based success or failure to connect for
* the entire SockJS request regardless of which transport actually managed
* to connect. Also implements {@code Runnable} to handle a scheduled timeout
* callback.
*/
private class CompletableConnectCallback
implements Runnable, BiConsumer<WebSocketSession, Throwable> {
private final WebSocketHandler handler;
private final CompletableFuture<WebSocketSession> future;
private final AtomicBoolean handled = new AtomicBoolean();
public CompletableConnectCallback(WebSocketHandler handler, CompletableFuture<WebSocketSession> future) {
this.handler = handler;
this.future = future;
}
@Override
public void accept(@Nullable WebSocketSession session, @Nullable Throwable throwable) {
if (session != null) {
if (this.handled.compareAndSet(false, true)) {
this.future.complete(session);
}
else if (logger.isErrorEnabled()) {
logger.error("Connect success/failure already handled for " + DefaultTransportRequest.this);
}
}
else if (throwable != null) {
handleFailure(throwable, false);
}
}
@Override
public void run() {
handleFailure(null, true);
}
private void handleFailure(@Nullable Throwable ex, boolean isTimeoutFailure) {
if (this.handled.compareAndSet(false, true)) {
if (isTimeoutFailure) {
String message = "Connect timed out for " + DefaultTransportRequest.this;
logger.error(message);
ex = new SockJsTransportFailureException(message, getSockJsUrlInfo().getSessionId(), ex);
}
if (fallbackRequest != null) {
logger.error(DefaultTransportRequest.this + " failed. Falling back on next transport.", ex);
fallbackRequest.connect(this.handler, this.future);
}
else {
logger.error("No more fallback transports after " + DefaultTransportRequest.this, ex);
if (ex != null) {
this.future.completeExceptionally(ex);
}
}
if (isTimeoutFailure) {
try {
for (Runnable runnable : timeoutTasks) {
runnable.run();
}
}
catch (Throwable ex2) {
logger.error("Transport failed to run timeout tasks for " + DefaultTransportRequest.this, ex2);
}
}
}
else {
logger.error("Connect success/failure events already took place for " +
DefaultTransportRequest.this + ". Ignoring this additional failure event.", ex);
}
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -20,6 +20,7 @@ import java.io.ByteArrayOutputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Enumeration;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentResponse;
@ -36,7 +37,6 @@ import org.springframework.http.ResponseEntity;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.StreamUtils;
import org.springframework.util.concurrent.SettableListenableFuture;
import org.springframework.web.client.HttpServerErrorException;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
@ -110,7 +110,7 @@ public class JettyXhrTransport extends AbstractXhrTransport implements Lifecycle
@Override
protected void connectInternal(TransportRequest transportRequest, WebSocketHandler handler,
URI url, HttpHeaders handshakeHeaders, XhrClientSockJsSession session,
SettableListenableFuture<WebSocketSession> connectFuture) {
CompletableFuture<WebSocketSession> connectFuture) {
HttpHeaders httpHeaders = transportRequest.getHttpRequestHeaders();
SockJsResponseListener listener = new SockJsResponseListener(url, httpHeaders, session, connectFuture);
@ -197,12 +197,12 @@ public class JettyXhrTransport extends AbstractXhrTransport implements Lifecycle
private final XhrClientSockJsSession sockJsSession;
private final SettableListenableFuture<WebSocketSession> connectFuture;
private final CompletableFuture<WebSocketSession> connectFuture;
private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
public SockJsResponseListener(URI url, HttpHeaders headers, XhrClientSockJsSession sockJsSession,
SettableListenableFuture<WebSocketSession> connectFuture) {
CompletableFuture<WebSocketSession> connectFuture) {
this.transportUrl = url;
this.receiveHeaders = headers;
@ -273,7 +273,7 @@ public class JettyXhrTransport extends AbstractXhrTransport implements Lifecycle
@Override
public void onFailure(Response response, Throwable failure) {
if (this.connectFuture.setException(failure)) {
if (this.connectFuture.completeExceptionally(failure)) {
return;
}
if (this.sockJsSession.isDisconnected()) {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -20,6 +20,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
@ -34,7 +35,6 @@ import org.springframework.http.client.ClientHttpResponse;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.StreamUtils;
import org.springframework.util.concurrent.SettableListenableFuture;
import org.springframework.web.client.HttpServerErrorException;
import org.springframework.web.client.RequestCallback;
import org.springframework.web.client.ResponseExtractor;
@ -99,7 +99,7 @@ public class RestTemplateXhrTransport extends AbstractXhrTransport {
@Override
protected void connectInternal(final TransportRequest transportRequest, final WebSocketHandler handler,
final URI receiveUrl, final HttpHeaders handshakeHeaders, final XhrClientSockJsSession session,
final SettableListenableFuture<WebSocketSession> connectFuture) {
final CompletableFuture<WebSocketSession> connectFuture) {
getTaskExecutor().execute(() -> {
HttpHeaders httpHeaders = transportRequest.getHttpRequestHeaders();
@ -120,7 +120,7 @@ public class RestTemplateXhrTransport extends AbstractXhrTransport {
}
catch (Exception ex) {
if (!connectFuture.isDone()) {
connectFuture.setException(ex);
connectFuture.completeExceptionally(ex);
}
else {
session.handleTransportError(ex);

View File

@ -23,6 +23,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
@ -35,8 +36,6 @@ import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.WebSocketSession;
@ -229,16 +228,16 @@ public class SockJsClient implements WebSocketClient, Lifecycle {
@Override
public ListenableFuture<WebSocketSession> doHandshake(
public CompletableFuture<WebSocketSession> execute(
WebSocketHandler handler, String uriTemplate, Object... uriVars) {
Assert.notNull(uriTemplate, "uriTemplate must not be null");
URI uri = UriComponentsBuilder.fromUriString(uriTemplate).buildAndExpand(uriVars).encode().toUri();
return doHandshake(handler, null, uri);
return execute(handler, null, uri);
}
@Override
public final ListenableFuture<WebSocketSession> doHandshake(
public final CompletableFuture<WebSocketSession> execute(
WebSocketHandler handler, @Nullable WebSocketHttpHeaders headers, URI url) {
Assert.notNull(handler, "WebSocketHandler is required");
@ -249,7 +248,7 @@ public class SockJsClient implements WebSocketClient, Lifecycle {
throw new IllegalArgumentException("Invalid scheme: '" + scheme + "'");
}
SettableListenableFuture<WebSocketSession> connectFuture = new SettableListenableFuture<>();
CompletableFuture<WebSocketSession> connectFuture = new CompletableFuture<>();
try {
SockJsUrlInfo sockJsUrlInfo = new SockJsUrlInfo(url);
ServerInfo serverInfo = getServerInfo(sockJsUrlInfo, getHttpRequestHeaders(headers));
@ -259,7 +258,7 @@ public class SockJsClient implements WebSocketClient, Lifecycle {
if (logger.isErrorEnabled()) {
logger.error("Initial SockJS \"Info\" request to server failed, url=" + url, exception);
}
connectFuture.setException(exception);
connectFuture.completeExceptionally(exception);
}
return connectFuture;
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,7 +17,9 @@
package org.springframework.web.socket.sockjs.client;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.springframework.util.concurrent.CompletableToListenableFutureAdapter;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
@ -43,7 +45,20 @@ public interface Transport {
* @param request the transport request.
* @param webSocketHandler the application handler to delegate lifecycle events to.
* @return a future to indicate success or failure to connect.
* @deprecated as of 6.0, in favor of {@link #connectAsync(TransportRequest, WebSocketHandler)}
*/
ListenableFuture<WebSocketSession> connect(TransportRequest request, WebSocketHandler webSocketHandler);
@Deprecated
default ListenableFuture<WebSocketSession> connect(TransportRequest request, WebSocketHandler webSocketHandler) {
return new CompletableToListenableFutureAdapter<>(connectAsync(request, webSocketHandler));
}
/**
* Connect the transport.
* @param request the transport request.
* @param webSocketHandler the application handler to delegate lifecycle events to.
* @return a future to indicate success or failure to connect.
* @since 6.0
*/
CompletableFuture<WebSocketSession> connectAsync(TransportRequest request, WebSocketHandler webSocketHandler);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@ -55,7 +56,6 @@ import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.StreamUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.SettableListenableFuture;
import org.springframework.web.client.HttpServerErrorException;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
@ -135,14 +135,14 @@ public class UndertowXhrTransport extends AbstractXhrTransport {
@Override
protected void connectInternal(TransportRequest request, WebSocketHandler handler, URI receiveUrl,
HttpHeaders handshakeHeaders, XhrClientSockJsSession session,
SettableListenableFuture<WebSocketSession> connectFuture) {
CompletableFuture<WebSocketSession> connectFuture) {
executeReceiveRequest(request, receiveUrl, handshakeHeaders, session, connectFuture);
}
private void executeReceiveRequest(final TransportRequest transportRequest,
final URI url, final HttpHeaders headers, final XhrClientSockJsSession session,
final SettableListenableFuture<WebSocketSession> connectFuture) {
final CompletableFuture<WebSocketSession> connectFuture) {
if (logger.isTraceEnabled()) {
logger.trace("Starting XHR receive request for " + url);
@ -180,7 +180,7 @@ public class UndertowXhrTransport extends AbstractXhrTransport {
private ClientCallback<ClientExchange> createReceiveCallback(final TransportRequest transportRequest,
final URI url, final HttpHeaders headers, final XhrClientSockJsSession sockJsSession,
final SettableListenableFuture<WebSocketSession> connectFuture) {
final CompletableFuture<WebSocketSession> connectFuture) {
return new ClientCallback<>() {
@Override
@ -231,7 +231,7 @@ public class UndertowXhrTransport extends AbstractXhrTransport {
}
private void onFailure(Throwable failure) {
if (connectFuture.setException(failure)) {
if (connectFuture.completeExceptionally(failure)) {
return;
}
if (sockJsSession.isDisconnected()) {
@ -374,13 +374,13 @@ public class UndertowXhrTransport extends AbstractXhrTransport {
private final XhrClientSockJsSession session;
private final SettableListenableFuture<WebSocketSession> connectFuture;
private final CompletableFuture<WebSocketSession> connectFuture;
private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
public SockJsResponseListener(TransportRequest request, ClientConnection connection, URI url,
HttpHeaders headers, XhrClientSockJsSession sockJsSession,
SettableListenableFuture<WebSocketSession> connectFuture) {
CompletableFuture<WebSocketSession> connectFuture) {
this.request = request;
this.connection = connection;
@ -462,7 +462,7 @@ public class UndertowXhrTransport extends AbstractXhrTransport {
public void onFailure(Throwable failure) {
IoUtils.safeClose(this.connection);
if (this.connectFuture.setException(failure)) {
if (this.connectFuture.completeExceptionally(failure)) {
return;
}
if (this.session.isDisconnected()) {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,6 +19,7 @@ package org.springframework.web.socket.sockjs.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@ -42,13 +43,23 @@ public class WebSocketClientSockJsSession extends AbstractClientSockJsSession im
@Nullable
private WebSocketSession webSocketSession;
/**
* Create a new {@code WebSocketClientSockJsSession}.
* @deprecated as of 6.0, in favor of {@link #WebSocketClientSockJsSession(TransportRequest, WebSocketHandler, CompletableFuture)}
*/
@Deprecated
public WebSocketClientSockJsSession(TransportRequest request, WebSocketHandler handler,
SettableListenableFuture<WebSocketSession> connectFuture) {
super(request, handler, connectFuture);
}
public WebSocketClientSockJsSession(TransportRequest request, WebSocketHandler handler,
CompletableFuture<WebSocketSession> connectFuture) {
super(request, handler, connectFuture);
}
@Override
public Object getNativeSession() {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,17 +19,14 @@ package org.springframework.web.socket.sockjs.client;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.Lifecycle;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.SettableListenableFuture;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
@ -73,9 +70,11 @@ public class WebSocketTransport implements Transport, Lifecycle {
return Collections.singletonList(TransportType.WEBSOCKET);
}
@Override
public ListenableFuture<WebSocketSession> connect(TransportRequest request, WebSocketHandler handler) {
final SettableListenableFuture<WebSocketSession> future = new SettableListenableFuture<>();
public CompletableFuture<WebSocketSession> connectAsync(TransportRequest request,
WebSocketHandler handler) {
CompletableFuture<WebSocketSession> future = new CompletableFuture<>();
WebSocketClientSockJsSession session = new WebSocketClientSockJsSession(request, handler, future);
handler = new ClientSockJsWebSocketHandler(session);
request.addTimeoutTask(session.getTimeoutTask());
@ -85,21 +84,14 @@ public class WebSocketTransport implements Transport, Lifecycle {
if (logger.isDebugEnabled()) {
logger.debug("Starting WebSocket session on " + url);
}
this.webSocketClient.doHandshake(handler, headers, url).addCallback(
new ListenableFutureCallback<WebSocketSession>() {
@Override
public void onSuccess(@Nullable WebSocketSession webSocketSession) {
// WebSocket session ready, SockJS Session not yet
}
@Override
public void onFailure(Throwable ex) {
future.setException(ex);
}
});
this.webSocketClient.execute(handler, headers, url).whenComplete((webSocketSession, throwable) -> {
if (throwable != null) {
future.completeExceptionally(throwable);
}
});
return future;
}
@Override
public void start() {
if (!isRunning()) {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -20,6 +20,7 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
@ -53,7 +54,11 @@ public class XhrClientSockJsSession extends AbstractClientSockJsSession {
private int binaryMessageSizeLimit = -1;
/**
* Create a new {@code XhrClientSockJsSession}.
* @deprecated as of 6.0, in favor of {@link #XhrClientSockJsSession(TransportRequest, WebSocketHandler, XhrTransport, CompletableFuture)}
*/
@Deprecated
public XhrClientSockJsSession(TransportRequest request, WebSocketHandler handler,
XhrTransport transport, SettableListenableFuture<WebSocketSession> connectFuture) {
@ -67,6 +72,19 @@ public class XhrClientSockJsSession extends AbstractClientSockJsSession {
this.sendUrl = request.getSockJsUrlInfo().getTransportUrl(TransportType.XHR_SEND);
}
public XhrClientSockJsSession(TransportRequest request, WebSocketHandler handler,
XhrTransport transport, CompletableFuture<WebSocketSession> connectFuture) {
super(request, handler, connectFuture);
Assert.notNull(transport, "XhrTransport is required");
this.transport = transport;
this.headers = request.getHttpRequestHeaders();
this.sendHeaders = new HttpHeaders();
this.sendHeaders.putAll(this.headers);
this.sendHeaders.setContentType(MediaType.APPLICATION_JSON);
this.sendUrl = request.getSockJsUrlInfo().getTransportUrl(TransportType.XHR_SEND);
}
public HttpHeaders getHeaders() {
return this.headers;

View File

@ -22,6 +22,7 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
@ -35,7 +36,6 @@ import org.junit.jupiter.params.provider.MethodSource;
import org.springframework.context.Lifecycle;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.context.support.AnnotationConfigWebApplicationContext;
import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.jetty.JettyWebSocketClient;
@ -150,8 +150,8 @@ public abstract class AbstractWebSocketIntegrationTests {
return "ws://localhost:" + this.server.getPort();
}
protected ListenableFuture<WebSocketSession> doHandshake(WebSocketHandler clientHandler, String endpointPath) {
return this.webSocketClient.doHandshake(clientHandler, getWsBaseUrl() + endpointPath);
protected CompletableFuture<WebSocketSession> execute(WebSocketHandler clientHandler, String endpointPath) {
return this.webSocketClient.execute(clientHandler, getWsBaseUrl() + endpointPath);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,13 +19,12 @@ package org.springframework.web.socket.client;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.Test;
import org.springframework.context.Lifecycle;
import org.springframework.http.HttpHeaders;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureTask;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.WebSocketSession;
@ -112,21 +111,21 @@ public class WebSocketConnectionManagerTests {
}
@Override
public ListenableFuture<WebSocketSession> doHandshake(WebSocketHandler handler,
public CompletableFuture<WebSocketSession> execute(WebSocketHandler handler,
String uriTemplate, Object... uriVars) {
URI uri = UriComponentsBuilder.fromUriString(uriTemplate).buildAndExpand(uriVars).encode().toUri();
return doHandshake(handler, null, uri);
return execute(handler, null, uri);
}
@Override
public ListenableFuture<WebSocketSession> doHandshake(WebSocketHandler handler,
public CompletableFuture<WebSocketSession> execute(WebSocketHandler handler,
WebSocketHttpHeaders headers, URI uri) {
this.webSocketHandler = handler;
this.headers = headers;
this.uri = uri;
return new ListenableFutureTask<>(() -> null);
return CompletableFuture.supplyAsync(() -> null);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -79,7 +79,7 @@ class StompWebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
TextMessage message = create(StompCommand.SEND).headers("destination:/app/simple").build();
try (WebSocketSession session = doHandshake(new TestClientWebSocketHandler(0, message), "/ws").get()) {
try (WebSocketSession session = execute(new TestClientWebSocketHandler(0, message), "/ws").get()) {
assertThat(session).isNotNull();
SimpleController controller = this.wac.getBean(SimpleController.class);
assertThat(controller.latch.await(TIMEOUT, TimeUnit.SECONDS)).isTrue();
@ -98,7 +98,7 @@ class StompWebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1, m2);
try (WebSocketSession session = doHandshake(clientHandler, "/ws").get()) {
try (WebSocketSession session = execute(clientHandler, "/ws").get()) {
assertThat(session).isNotNull();
assertThat(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS)).isTrue();
}
@ -114,7 +114,7 @@ class StompWebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1, m2);
try (WebSocketSession session = doHandshake(clientHandler, "/ws").get()) {
try (WebSocketSession session = execute(clientHandler, "/ws").get()) {
assertThat(session).isNotNull();
assertThat(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS)).isTrue();
@ -133,7 +133,7 @@ class StompWebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1);
try (WebSocketSession session = doHandshake(clientHandler, "/ws").get()) {
try (WebSocketSession session = execute(clientHandler, "/ws").get()) {
assertThat(session).isNotNull();
assertThat(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS)).isTrue();
String payload = clientHandler.actual.get(1).getPayload();
@ -153,7 +153,7 @@ class StompWebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1, m2);
try (WebSocketSession session = doHandshake(clientHandler, "/ws").get()) {
try (WebSocketSession session = execute(clientHandler, "/ws").get()) {
assertThat(session).isNotNull();
assertThat(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS)).isTrue();
String payload = clientHandler.actual.get(1).getPayload();
@ -175,7 +175,7 @@ class StompWebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1, m2);
try (WebSocketSession session = doHandshake(clientHandler, "/ws").get()) {
try (WebSocketSession session = execute(clientHandler, "/ws").get()) {
assertThat(session).isNotNull();
assertThat(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS)).isTrue();
String payload = clientHandler.actual.get(1).getPayload();

View File

@ -19,6 +19,7 @@ package org.springframework.web.socket.messaging;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import org.junit.jupiter.api.BeforeEach;
@ -39,7 +40,6 @@ import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.concurrent.SettableListenableFuture;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.PongMessage;
@ -82,7 +82,7 @@ public class WebSocketStompClientTests {
private ArgumentCaptor<WebSocketHandler> webSocketHandlerCaptor;
private SettableListenableFuture<WebSocketSession> handshakeFuture;
private CompletableFuture<WebSocketSession> handshakeFuture;
@BeforeEach
@ -93,8 +93,8 @@ public class WebSocketStompClientTests {
this.stompClient.setStompSession(this.stompSession);
this.webSocketHandlerCaptor = ArgumentCaptor.forClass(WebSocketHandler.class);
this.handshakeFuture = new SettableListenableFuture<>();
given(webSocketClient.doHandshake(this.webSocketHandlerCaptor.capture(), any(), any(URI.class)))
this.handshakeFuture = new CompletableFuture<>();
given(webSocketClient.execute(this.webSocketHandlerCaptor.capture(), any(), any(URI.class)))
.willReturn(this.handshakeFuture);
}
@ -104,7 +104,7 @@ public class WebSocketStompClientTests {
connect();
IllegalStateException handshakeFailure = new IllegalStateException("simulated exception");
this.handshakeFuture.setException(handshakeFailure);
this.handshakeFuture.completeExceptionally(handshakeFailure);
verify(this.stompSession).afterConnectFailure(same(handshakeFailure));
}
@ -202,7 +202,7 @@ public class WebSocketStompClientTests {
accessor.setDestination("/topic/foo");
byte[] payload = "payload".getBytes(StandardCharsets.UTF_8);
getTcpConnection().send(MessageBuilder.createMessage(payload, accessor.getMessageHeaders()));
getTcpConnection().sendAsync(MessageBuilder.createMessage(payload, accessor.getMessageHeaders()));
ArgumentCaptor<TextMessage> textMessageCaptor = ArgumentCaptor.forClass(TextMessage.class);
verify(this.webSocketSession).sendMessage(textMessageCaptor.capture());
@ -218,7 +218,7 @@ public class WebSocketStompClientTests {
accessor.setContentType(MimeTypeUtils.APPLICATION_OCTET_STREAM);
byte[] payload = "payload".getBytes(StandardCharsets.UTF_8);
getTcpConnection().send(MessageBuilder.createMessage(payload, accessor.getMessageHeaders()));
getTcpConnection().sendAsync(MessageBuilder.createMessage(payload, accessor.getMessageHeaders()));
ArgumentCaptor<BinaryMessage> binaryMessageCaptor = ArgumentCaptor.forClass(BinaryMessage.class);
verify(this.webSocketSession).sendMessage(binaryMessageCaptor.capture());
@ -309,9 +309,9 @@ public class WebSocketStompClientTests {
private WebSocketHandler connect() {
this.stompClient.connect("/foo", mock(StompSessionHandler.class));
this.stompClient.connectAsync("/foo", mock(StompSessionHandler.class));
verify(this.stompSession).getSessionFuture();
verify(this.stompSession).getSession();
verifyNoMoreInteractions(this.stompSession);
WebSocketHandler webSocketHandler = this.webSocketHandlerCaptor.getValue();

View File

@ -20,11 +20,11 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.util.concurrent.SettableListenableFuture;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketExtension;
@ -56,7 +56,7 @@ public class ClientSockJsSessionTests {
private WebSocketHandler handler;
private SettableListenableFuture<WebSocketSession> connectFuture;
private CompletableFuture<WebSocketSession> connectFuture;
@BeforeEach
@ -65,7 +65,7 @@ public class ClientSockJsSessionTests {
Transport transport = mock(Transport.class);
TransportRequest request = new DefaultTransportRequest(urlInfo, null, null, transport, TransportType.XHR, CODEC);
this.handler = mock(WebSocketHandler.class);
this.connectFuture = new SettableListenableFuture<>();
this.connectFuture = new CompletableFuture<>();
this.session = new TestClientSockJsSession(request, this.handler, this.connectFuture);
}
@ -223,7 +223,7 @@ public class ClientSockJsSessionTests {
protected TestClientSockJsSession(TransportRequest request, WebSocketHandler handler,
SettableListenableFuture<WebSocketSession> connectFuture) {
CompletableFuture<WebSocketSession> connectFuture) {
super(request, handler, connectFuture);
}

View File

@ -19,7 +19,9 @@ package org.springframework.web.socket.sockjs.client;
import java.io.IOException;
import java.net.URI;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -27,8 +29,6 @@ import org.mockito.ArgumentCaptor;
import org.springframework.http.HttpHeaders;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.SettableListenableFuture;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.sockjs.frame.Jackson2SockJsMessageCodec;
import org.springframework.web.socket.sockjs.transport.TransportType;
@ -50,9 +50,9 @@ public class DefaultTransportRequestTests {
private static final Jackson2SockJsMessageCodec CODEC = new Jackson2SockJsMessageCodec();
private SettableListenableFuture<WebSocketSession> connectFuture;
private CompletableFuture<WebSocketSession> connectFuture;
private ListenableFutureCallback<WebSocketSession> connectCallback;
private BiConsumer<WebSocketSession, Throwable> connectCallback;
private TestTransport webSocketTransport;
@ -62,9 +62,9 @@ public class DefaultTransportRequestTests {
@SuppressWarnings("unchecked")
@BeforeEach
public void setup() throws Exception {
this.connectCallback = mock(ListenableFutureCallback.class);
this.connectFuture = new SettableListenableFuture<>();
this.connectFuture.addCallback(this.connectCallback);
this.connectCallback = mock(BiConsumer.class);
this.connectFuture = new CompletableFuture<>();
this.connectFuture.whenComplete(this.connectCallback);
this.webSocketTransport = new TestTransport("WebSocketTestTransport");
this.xhrTransport = new TestTransport("XhrTestTransport");
}
@ -75,7 +75,7 @@ public class DefaultTransportRequestTests {
DefaultTransportRequest request = createTransportRequest(this.webSocketTransport, TransportType.WEBSOCKET);
request.connect(null, this.connectFuture);
WebSocketSession session = mock(WebSocketSession.class);
this.webSocketTransport.getConnectCallback().onSuccess(session);
this.webSocketTransport.getConnectCallback().accept(session, null);
assertThat(this.connectFuture.get()).isSameAs(session);
}
@ -87,12 +87,12 @@ public class DefaultTransportRequestTests {
request1.connect(null, this.connectFuture);
// Transport error => fallback
this.webSocketTransport.getConnectCallback().onFailure(new IOException("Fake exception 1"));
this.webSocketTransport.getConnectCallback().accept(null, new IOException("Fake exception 1"));
assertThat(this.connectFuture.isDone()).isFalse();
assertThat(this.xhrTransport.invoked()).isTrue();
// Transport error => no more fallback
this.xhrTransport.getConnectCallback().onFailure(new IOException("Fake exception 2"));
this.xhrTransport.getConnectCallback().accept(null, new IOException("Fake exception 2"));
assertThat(this.connectFuture.isDone()).isTrue();
assertThatExceptionOfType(ExecutionException.class).isThrownBy(
this.connectFuture::get)

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -87,7 +87,7 @@ public class SockJsClientTests {
this.sockJsClient.doHandshake(handler, URL).addCallback(this.connectCallback);
assertThat(this.webSocketTransport.invoked()).isTrue();
WebSocketSession session = mock(WebSocketSession.class);
this.webSocketTransport.getConnectCallback().onSuccess(session);
this.webSocketTransport.getConnectCallback().accept(session, null);
verify(this.connectCallback).onSuccess(session);
verifyNoMoreInteractions(this.connectCallback);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -20,12 +20,13 @@ import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import org.mockito.ArgumentCaptor;
import org.springframework.http.HttpHeaders;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.lang.Nullable;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
@ -46,7 +47,8 @@ class TestTransport implements Transport {
private TransportRequest request;
private ListenableFuture future;
@Nullable
private CompletableFuture<WebSocketSession> future;
public TestTransport(String name) {
@ -67,17 +69,17 @@ class TestTransport implements Transport {
}
@SuppressWarnings("unchecked")
public ListenableFutureCallback<WebSocketSession> getConnectCallback() {
ArgumentCaptor<ListenableFutureCallback> captor = ArgumentCaptor.forClass(ListenableFutureCallback.class);
verify(this.future).addCallback(captor.capture());
public BiConsumer<WebSocketSession, Throwable> getConnectCallback() {
ArgumentCaptor<BiConsumer> captor = ArgumentCaptor.forClass(BiConsumer.class);
verify(this.future).whenComplete(captor.capture());
return captor.getValue();
}
@SuppressWarnings("unchecked")
@Override
public ListenableFuture<WebSocketSession> connect(TransportRequest request, WebSocketHandler handler) {
public CompletableFuture<WebSocketSession> connectAsync(TransportRequest request, WebSocketHandler handler) {
this.request = request;
this.future = mock(ListenableFuture.class);
this.future = mock(CompletableFuture.class);
return this.future;
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,6 +17,7 @@
package org.springframework.web.socket.sockjs.client;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
@ -25,7 +26,6 @@ import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.util.concurrent.SettableListenableFuture;
import org.springframework.web.client.HttpServerErrorException;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
@ -142,7 +142,7 @@ public class XhrTransportTests {
@Override
protected void connectInternal(TransportRequest request, WebSocketHandler handler, URI receiveUrl,
HttpHeaders handshakeHeaders, XhrClientSockJsSession session,
SettableListenableFuture<WebSocketSession> connectFuture) {
CompletableFuture<WebSocketSession> connectFuture) {
this.actualHandshakeHeaders = handshakeHeaders;
this.actualSession = session;