diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java index e40ad6a1f41..f454243ba2b 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java @@ -29,14 +29,17 @@ import reactor.core.config.ConfigurationReader; import reactor.core.config.DispatcherConfiguration; import reactor.core.config.ReactorConfiguration; import reactor.core.support.NamedDaemonThreadFactory; -import reactor.fn.BiFunction; import reactor.fn.Consumer; import reactor.fn.Function; import reactor.fn.tuple.Tuple; import reactor.fn.tuple.Tuple2; import reactor.io.buffer.Buffer; import reactor.io.codec.Codec; -import reactor.io.net.*; +import reactor.io.net.ChannelStream; +import reactor.io.net.NetStreams; +import reactor.io.net.NetStreams.TcpClientFactory; +import reactor.io.net.ReactorChannelHandler; +import reactor.io.net.Reconnect; import reactor.io.net.Spec.TcpClientSpec; import reactor.io.net.impl.netty.NettyClientSocketOptions; import reactor.io.net.impl.netty.tcp.NettyTcpClient; @@ -71,7 +74,7 @@ public class Reactor2TcpClient
implements TcpOperations
{
@SuppressWarnings("rawtypes")
public static final Class > tcpClientSpecFactory;
+ private final TcpClientFactory > tcpClientSpecFactory;
private final List >> tcpClients =
new ArrayList >>();
@@ -94,7 +97,7 @@ public class Reactor2TcpClient implements TcpOperations {
final NioEventLoopGroup eventLoopGroup = initEventLoopGroup();
- this.tcpClientSpecFactory = new NetStreams.TcpClientFactory >() {
+ this.tcpClientSpecFactory = new TcpClientFactory >() {
@Override
public TcpClientSpec > apply(TcpClientSpec > spec) {
@@ -111,7 +114,8 @@ public class Reactor2TcpClient implements TcpOperations {
int ioThreadCount;
try {
ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount"));
- } catch (Exception i) {
+ }
+ catch (Exception i) {
ioThreadCount = -1;
}
if (ioThreadCount <= 0l) {
@@ -132,20 +136,16 @@ public class Reactor2TcpClient implements TcpOperations {
*
* @param tcpClientSpecFactory the TcpClientSpec {@link Function} to use for each client creation.
*/
- public Reactor2TcpClient(NetStreams.TcpClientFactory > tcpClientSpecFactory) {
+ public Reactor2TcpClient(TcpClientFactory > tcpClientSpecFactory) {
Assert.notNull(tcpClientSpecFactory, "'tcpClientClientFactory' must not be null");
this.tcpClientSpecFactory = tcpClientSpecFactory;
}
@Override
- public ListenableFuture connectionHandler) {
- Class > tcpClient = NetStreams.tcpClient(type, this.tcpClientSpecFactory);
-
- Promise connectionHandler) {
+ Assert.notNull(connectionHandler, "'connectionHandler' must not be null");
+ Promise (connectionHandler));
return new PassThroughPromiseToListenableFutureAdapter implements TcpOperations {
}
@Override
- public ListenableFuture handler, ReconnectStrategy strategy) {
- Assert.notNull(strategy, "ReconnectStrategy must not be null");
- Class connectionHandler, ReconnectStrategy strategy) {
+ Assert.notNull(connectionHandler, "'connectionHandler' must not be null");
+ Assert.notNull(strategy, "'reconnectStrategy' must not be null");
- TcpClient > tcpClient = NetStreams.tcpClient(type, this.tcpClientSpecFactory);
-
- Stream (connectionHandler),
+ new ReactorReconnectAdapter(strategy));
return new PassThroughPromiseToListenableFutureAdapter composeConnectionHandling(
- final TcpClient > tcpClient,
- final TcpConnectionHandler connectionHandler
- ) {
-
+ private TcpClient > createTcpClient() {
+ Class > tcpClient = NetStreams.tcpClient(type, this.tcpClientSpecFactory);
synchronized (this.tcpClients) {
this.tcpClients.add(tcpClient);
}
-
- return new MessageHandler () {
- @Override
- public Publisher > connection) {
-
- Promise (connection, closePromise));
-
- connection
- .finallyDo(new Consumer message) {
- connectionHandler.handleMessage(message);
- }
- });
-
- return closePromise;
- }
- };
+ return tcpClient;
}
@Override
public ListenableFuture >> clients;
-
+ final List >> readOnlyClients;
synchronized (this.tcpClients) {
- clients = new ArrayList >>(this.tcpClients);
+ readOnlyClients = new ArrayList >>(this.tcpClients);
}
-
- Promise >, Promise > client) {
@@ -237,10 +198,10 @@ public class Reactor2TcpClient implements TcpOperations {
}
})
.next();
-
return new PassThroughPromiseToListenableFutureAdapter implements TcpOperations {
}
}
- private static class ReactorRectonnectAdapter implements Reconnect {
+ private static class MessageChannelStreamHandler
+ implements ReactorChannelHandler , ChannelStream >> {
+
+ private final TcpConnectionHandler connectionHandler;
+
+ public MessageChannelStreamHandler(TcpConnectionHandler connectionHandler) {
+ this.connectionHandler = connectionHandler;
+ }
+
+ @Override
+ public Publisher > channelStream) {
+
+ Promise (channelStream, closePromise));
+
+ channelStream
+ .finallyDo(new Consumer message) {
+ connectionHandler.handleMessage(message);
+ }
+ });
+
+ return closePromise;
+ }
+ }
+
+ private static class ReactorReconnectAdapter implements Reconnect {
private final ReconnectStrategy strategy;
- public ReactorRectonnectAdapter(ReconnectStrategy strategy) {
+ public ReactorReconnectAdapter(ReconnectStrategy strategy) {
this.strategy = strategy;
}
@@ -261,11 +263,6 @@ public class Reactor2TcpClient implements TcpOperations {
public Tuple2
- extends ReactorChannelHandler , ChannelStream >>{
}
}
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpConnection.java
index ed198f9599f..ff2f8ba585e 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpConnection.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpConnection.java
@@ -16,15 +16,11 @@
package org.springframework.messaging.tcp.reactor;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
-import org.springframework.util.concurrent.ListenableFutureAdapter;
import reactor.fn.Functions;
import reactor.io.net.ChannelStream;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.Streams;
-import reactor.rx.broadcast.Broadcaster;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.TcpConnection;
@@ -42,9 +38,13 @@ import org.springframework.util.concurrent.ListenableFuture;
public class Reactor2TcpConnection implements TcpConnection {
private final ChannelStream > channelStream;
+
private final Promise > channelStream, Promise > channelStream,
+ Promise implements TcpConnection {
public void close() {
this.closePromise.onComplete();
}
+
}