diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReactorNettyTcpClient.java index 6e53fa95a5f..28ad81f9a5a 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReactorNettyTcpClient.java @@ -19,12 +19,17 @@ package org.springframework.messaging.support.tcp; import java.net.InetSocketAddress; import org.springframework.messaging.Message; +import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFuture; import reactor.core.Environment; import reactor.core.composable.Composable; +import reactor.core.composable.Deferred; import reactor.core.composable.Promise; +import reactor.core.composable.Stream; +import reactor.core.composable.spec.Promises; import reactor.function.Consumer; +import reactor.function.support.SingleUseConsumer; import reactor.io.Buffer; import reactor.tcp.Reconnect; import reactor.tcp.TcpClient; @@ -59,27 +64,39 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ @Override - public void connect(TcpConnectionHandler

connectionHandler) { - this.connect(connectionHandler, null); + public ListenableFuture connect(TcpConnectionHandler

connectionHandler) { + + Promise, Message

>> promise = this.tcpClient.open(); + composeConnectionHandling(promise, connectionHandler); + + return new AbstractPromiseToListenableFutureAdapter, Message

>, Void>(promise) { + @Override + protected Void adapt(TcpConnection, Message

> result) { + return null; + } + }; } @Override - public void connect(final TcpConnectionHandler

connectionHandler, + public ListenableFuture connect(final TcpConnectionHandler

connectionHandler, final ReconnectStrategy reconnectStrategy) { - Composable, Message

>> composable; + Assert.notNull(reconnectStrategy, "'reconnectStrategy' is required"); - if (reconnectStrategy != null) { - composable = this.tcpClient.open(new Reconnect() { - @Override - public Tuple2 reconnect(InetSocketAddress address, int attempt) { - return Tuple.of(address, reconnectStrategy.getTimeToNextAttempt(attempt)); - } - }); - } - else { - composable = this.tcpClient.open(); - } + Stream, Message

>> stream = + this.tcpClient.open(new Reconnect() { + @Override + public Tuple2 reconnect(InetSocketAddress address, int attempt) { + return Tuple.of(address, reconnectStrategy.getTimeToNextAttempt(attempt)); + } + }); + composeConnectionHandling(stream, connectionHandler); + + return new PassThroughPromiseToListenableFutureAdapter(toPromise(stream)); + } + + private void composeConnectionHandling(Composable, Message

>> composable, + final TcpConnectionHandler

connectionHandler) { composable.when(Throwable.class, new Consumer() { @Override @@ -108,6 +125,27 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ }); } + private Promise toPromise(Stream, Message

>> stream) { + + final Deferred> deferred = Promises.defer().get(); + + stream.consume(SingleUseConsumer.once(new Consumer, Message

>>() { + @Override + public void accept(TcpConnection, Message

> conn) { + deferred.accept((Void) null); + } + })); + + stream.when(Throwable.class, SingleUseConsumer.once(new Consumer() { + @Override + public void accept(Throwable throwable) { + deferred.accept(throwable); + } + })); + + return deferred.compose(); + } + @Override public ListenableFuture shutdown() { try { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpConnection.java index 8e69a64962c..603fd1df63b 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpConnection.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpConnection.java @@ -32,7 +32,9 @@ public interface TcpConnection

{ /** * Send the given message. * @param message the message - * @return whether the send succeeded or not + * + * @return a ListenableFuture that can be used to determine when and if the + * message was successfully sent */ ListenableFuture send(Message

message); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpOperations.java b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpOperations.java index f4b12e40c9d..86cbd9f425f 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpOperations.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpOperations.java @@ -32,19 +32,28 @@ public interface TcpOperations

{ * Open a new connection. * * @param connectionHandler a handler to manage the connection + * + * @return a ListenableFuture that can be used to determine when and if the + * connection is successfully established */ - void connect(TcpConnectionHandler

connectionHandler); + ListenableFuture connect(TcpConnectionHandler

connectionHandler); /** * Open a new connection and a strategy for reconnecting if the connection fails. * * @param connectionHandler a handler to manage the connection * @param reconnectStrategy a strategy for reconnecting + * + * @return a ListenableFuture that can be used to determine when and if the + * initial connection is successfully established */ - void connect(TcpConnectionHandler

connectionHandler, ReconnectStrategy reconnectStrategy); + ListenableFuture connect(TcpConnectionHandler

connectionHandler, ReconnectStrategy reconnectStrategy); /** * Shut down and close any open connections. + * + * @return a ListenableFuture that can be used to determine when and if the + * connection is successfully closed */ ListenableFuture shutdown();