From 7891c0d5ca208ef4f123f2dfa92cd13e6cb25042 Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Mon, 27 Apr 2015 02:32:57 +0100 Subject: [PATCH] Update reactor2 support --- .../tcp/reactor/Reactor2TcpClient.java | 143 +++++++++--------- .../tcp/reactor/Reactor2TcpConnection.java | 11 +- 2 files changed, 76 insertions(+), 78 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java index e40ad6a1f41..f454243ba2b 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java @@ -29,14 +29,17 @@ import reactor.core.config.ConfigurationReader; import reactor.core.config.DispatcherConfiguration; import reactor.core.config.ReactorConfiguration; import reactor.core.support.NamedDaemonThreadFactory; -import reactor.fn.BiFunction; import reactor.fn.Consumer; import reactor.fn.Function; import reactor.fn.tuple.Tuple; import reactor.fn.tuple.Tuple2; import reactor.io.buffer.Buffer; import reactor.io.codec.Codec; -import reactor.io.net.*; +import reactor.io.net.ChannelStream; +import reactor.io.net.NetStreams; +import reactor.io.net.NetStreams.TcpClientFactory; +import reactor.io.net.ReactorChannelHandler; +import reactor.io.net.Reconnect; import reactor.io.net.Spec.TcpClientSpec; import reactor.io.net.impl.netty.NettyClientSocketOptions; import reactor.io.net.impl.netty.tcp.NettyTcpClient; @@ -71,7 +74,7 @@ public class Reactor2TcpClient

implements TcpOperations

{ @SuppressWarnings("rawtypes") public static final Class REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class; - private final NetStreams.TcpClientFactory, Message

> tcpClientSpecFactory; + private final TcpClientFactory, Message

> tcpClientSpecFactory; private final List, Message

>> tcpClients = new ArrayList, Message

>>(); @@ -94,7 +97,7 @@ public class Reactor2TcpClient

implements TcpOperations

{ final NioEventLoopGroup eventLoopGroup = initEventLoopGroup(); - this.tcpClientSpecFactory = new NetStreams.TcpClientFactory, Message

>() { + this.tcpClientSpecFactory = new TcpClientFactory, Message

>() { @Override public TcpClientSpec, Message

> apply(TcpClientSpec, Message

> spec) { @@ -111,7 +114,8 @@ public class Reactor2TcpClient

implements TcpOperations

{ int ioThreadCount; try { ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount")); - } catch (Exception i) { + } + catch (Exception i) { ioThreadCount = -1; } if (ioThreadCount <= 0l) { @@ -132,20 +136,16 @@ public class Reactor2TcpClient

implements TcpOperations

{ * * @param tcpClientSpecFactory the TcpClientSpec {@link Function} to use for each client creation. */ - public Reactor2TcpClient(NetStreams.TcpClientFactory, Message

> tcpClientSpecFactory) { + public Reactor2TcpClient(TcpClientFactory, Message

> tcpClientSpecFactory) { Assert.notNull(tcpClientSpecFactory, "'tcpClientClientFactory' must not be null"); this.tcpClientSpecFactory = tcpClientSpecFactory; } @Override - public ListenableFuture connect(TcpConnectionHandler

connectionHandler) { - Class type = REACTOR_TCP_CLIENT_TYPE; - - TcpClient, Message

> tcpClient = NetStreams.tcpClient(type, this.tcpClientSpecFactory); - - Promise promise = tcpClient.start(composeConnectionHandling(tcpClient, connectionHandler)); - + public ListenableFuture connect(final TcpConnectionHandler

connectionHandler) { + Assert.notNull(connectionHandler, "'connectionHandler' must not be null"); + Promise promise = createTcpClient().start(new MessageChannelStreamHandler

(connectionHandler)); return new PassThroughPromiseToListenableFutureAdapter( promise.onError(new Consumer() { @Override @@ -157,72 +157,33 @@ public class Reactor2TcpClient

implements TcpOperations

{ } @Override - public ListenableFuture connect(TcpConnectionHandler

handler, ReconnectStrategy strategy) { - Assert.notNull(strategy, "ReconnectStrategy must not be null"); - Class type = REACTOR_TCP_CLIENT_TYPE; + public ListenableFuture connect(TcpConnectionHandler

connectionHandler, ReconnectStrategy strategy) { + Assert.notNull(connectionHandler, "'connectionHandler' must not be null"); + Assert.notNull(strategy, "'reconnectStrategy' must not be null"); - TcpClient, Message

> tcpClient = NetStreams.tcpClient(type, this.tcpClientSpecFactory); - - Stream> stream = tcpClient.start( - composeConnectionHandling(tcpClient, handler), - new ReactorRectonnectAdapter(strategy) - ); + Stream> stream = createTcpClient().start( + new MessageChannelStreamHandler

(connectionHandler), + new ReactorReconnectAdapter(strategy)); return new PassThroughPromiseToListenableFutureAdapter(stream.next().after()); } - private MessageHandler

composeConnectionHandling( - final TcpClient, Message

> tcpClient, - final TcpConnectionHandler

connectionHandler - ) { - + private TcpClient, Message

> createTcpClient() { + Class type = REACTOR_TCP_CLIENT_TYPE; + TcpClient, Message

> tcpClient = NetStreams.tcpClient(type, this.tcpClientSpecFactory); synchronized (this.tcpClients) { this.tcpClients.add(tcpClient); } - - return new MessageHandler

() { - @Override - public Publisher apply(ChannelStream, Message

> connection) { - - Promise closePromise = Promises.prepare(); - - connectionHandler.afterConnected(new Reactor2TcpConnection

(connection, closePromise)); - - connection - .finallyDo(new Consumer>>() { - - @Override - public void accept(Signal> signal) { - if (signal.isOnError()) { - connectionHandler.handleFailure(signal.getThrowable()); - } else if (signal.isOnComplete()) { - connectionHandler.afterConnectionClosed(); - } - } - }) - .consume(new Consumer>() { - - @Override - public void accept(Message

message) { - connectionHandler.handleMessage(message); - } - }); - - return closePromise; - } - }; + return tcpClient; } @Override public ListenableFuture shutdown() { - - final List, Message

>> clients; - + final List, Message

>> readOnlyClients; synchronized (this.tcpClients) { - clients = new ArrayList, Message

>>(this.tcpClients); + readOnlyClients = new ArrayList, Message

>>(this.tcpClients); } - - Promise promise = Streams.from(clients) + Promise promise = Streams.from(readOnlyClients) .flatMap(new Function, Message

>, Promise>() { @Override public Promise apply(final TcpClient, Message

> client) { @@ -237,10 +198,10 @@ public class Reactor2TcpClient

implements TcpOperations

{ } }) .next(); - return new PassThroughPromiseToListenableFutureAdapter(promise); } + private static class SynchronousDispatcherConfigReader implements ConfigurationReader { @Override @@ -249,11 +210,52 @@ public class Reactor2TcpClient

implements TcpOperations

{ } } - private static class ReactorRectonnectAdapter implements Reconnect { + private static class MessageChannelStreamHandler

+ implements ReactorChannelHandler, Message

, ChannelStream, Message

>> { + + private final TcpConnectionHandler

connectionHandler; + + public MessageChannelStreamHandler(TcpConnectionHandler

connectionHandler) { + this.connectionHandler = connectionHandler; + } + + @Override + public Publisher apply(ChannelStream, Message

> channelStream) { + + Promise closePromise = Promises.prepare(); + + this.connectionHandler.afterConnected(new Reactor2TcpConnection

(channelStream, closePromise)); + + channelStream + .finallyDo(new Consumer>>() { + + @Override + public void accept(Signal> signal) { + if (signal.isOnError()) { + connectionHandler.handleFailure(signal.getThrowable()); + } + else if (signal.isOnComplete()) { + connectionHandler.afterConnectionClosed(); + } + } + }) + .consume(new Consumer>() { + + @Override + public void accept(Message

message) { + connectionHandler.handleMessage(message); + } + }); + + return closePromise; + } + } + + private static class ReactorReconnectAdapter implements Reconnect { private final ReconnectStrategy strategy; - public ReactorRectonnectAdapter(ReconnectStrategy strategy) { + public ReactorReconnectAdapter(ReconnectStrategy strategy) { this.strategy = strategy; } @@ -261,11 +263,6 @@ public class Reactor2TcpClient

implements TcpOperations

{ public Tuple2 reconnect(InetSocketAddress address, int attempt) { return Tuple.of(address, strategy.getTimeToNextAttempt(attempt)); } - - } - - private interface MessageHandler

- extends ReactorChannelHandler, Message

, ChannelStream, Message

>>{ } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpConnection.java index ed198f9599f..ff2f8ba585e 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpConnection.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpConnection.java @@ -16,15 +16,11 @@ package org.springframework.messaging.tcp.reactor; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import org.springframework.util.concurrent.ListenableFutureAdapter; import reactor.fn.Functions; import reactor.io.net.ChannelStream; import reactor.rx.Promise; import reactor.rx.Promises; import reactor.rx.Streams; -import reactor.rx.broadcast.Broadcaster; import org.springframework.messaging.Message; import org.springframework.messaging.tcp.TcpConnection; @@ -42,9 +38,13 @@ import org.springframework.util.concurrent.ListenableFuture; public class Reactor2TcpConnection

implements TcpConnection

{ private final ChannelStream, Message

> channelStream; + private final Promise closePromise; - public Reactor2TcpConnection(ChannelStream, Message

> channelStream, Promise closePromise) { + + public Reactor2TcpConnection(ChannelStream, Message

> channelStream, + Promise closePromise) { + this.channelStream = channelStream; this.closePromise = closePromise; } @@ -71,4 +71,5 @@ public class Reactor2TcpConnection

implements TcpConnection

{ public void close() { this.closePromise.onComplete(); } + }