Update TCP/Reactor

Issue: SPR-12599
This commit is contained in:
Rossen Stoyanchev 2015-04-24 07:12:53 -04:00
parent 74c0250525
commit 60b19c784d
7 changed files with 188 additions and 184 deletions

View File

@ -117,9 +117,9 @@ configure(allprojects) { project ->
} }
repositories { repositories {
mavenLocal()
maven { url "https://repo.spring.io/libs-release" } maven { url "https://repo.spring.io/libs-release" }
maven { url "https://repo.spring.io/milestone" } maven { url "https://repo.spring.io/milestone" }
maven { url "https://repo.spring.io/snapshot" } // for reactor 2.0.1 snapshot
} }
dependencies { dependencies {
@ -487,6 +487,7 @@ project("spring-messaging") {
compile(project(":spring-beans")) compile(project(":spring-beans"))
compile(project(":spring-core")) compile(project(":spring-core"))
compile(project(":spring-context")) compile(project(":spring-context"))
optional("io.projectreactor:reactor-core:${reactorVersion}")
optional("io.projectreactor:reactor-net:${reactorVersion}") { optional("io.projectreactor:reactor-net:${reactorVersion}") {
exclude group: "io.netty", module: "netty-all" exclude group: "io.netty", module: "netty-all"
} }
@ -497,7 +498,6 @@ project("spring-messaging") {
optional("org.eclipse.jetty.websocket:websocket-client:${jettyVersion}") optional("org.eclipse.jetty.websocket:websocket-client:${jettyVersion}")
optional("com.fasterxml.jackson.core:jackson-databind:${jackson2Version}") optional("com.fasterxml.jackson.core:jackson-databind:${jackson2Version}")
testCompile(project(":spring-test")) testCompile(project(":spring-test"))
testCompile('org.slf4j:slf4j-log4j12:1.7.10')
testCompile("javax.inject:javax.inject-tck:1") testCompile("javax.inject:javax.inject-tck:1")
testCompile("javax.servlet:javax.servlet-api:3.1.0") testCompile("javax.servlet:javax.servlet-api:3.1.0")
testCompile("javax.validation:validation-api:1.0.0.GA") testCompile("javax.validation:validation-api:1.0.0.GA")

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2014 the original author or authors. * Copyright 2002-2015 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.

View File

@ -18,16 +18,17 @@ package org.springframework.messaging.simp.stomp;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.TcpOperations; import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient; import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient.TcpClientSpecFactory;
import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFuture;
import reactor.Environment; import reactor.Environment;
import reactor.core.config.ConfigurationReader; import reactor.core.config.ConfigurationReader;
import reactor.core.config.DispatcherConfiguration; import reactor.core.config.DispatcherConfiguration;
import reactor.core.config.DispatcherType; import reactor.core.config.DispatcherType;
import reactor.core.config.ReactorConfiguration; import reactor.core.config.ReactorConfiguration;
import reactor.fn.Function; import reactor.io.net.Spec.TcpClientSpec;
import reactor.io.net.Spec;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.Properties; import java.util.Properties;
/** /**
@ -52,40 +53,18 @@ public class Reactor2TcpStompClient extends StompClientSupport {
/** /**
* Create an instance with the given host and port. * Create an instance with the given host and port.
*
* @param host the host * @param host the host
* @param port the port * @param port the port
*/ */
public Reactor2TcpStompClient(final String host, final int port) { public Reactor2TcpStompClient(final String host, final int port) {
this.tcpClient = new Reactor2TcpClient<byte[]>(createNettyTcpClientFactory(host, port)); ConfigurationReader reader = new StompClientDispatcherConfigReader();
} Environment environment = new Environment(reader).assignErrorJournal();
StompTcpClientSpecFactory factory = new StompTcpClientSpecFactory(environment, host, port);
private Function<Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>>, this.tcpClient = new Reactor2TcpClient<byte[]>(factory);
Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>>> createNettyTcpClientFactory(
final String host, final int port
) {
final Environment environment = new Environment(new StompClientDispatcherConfigReader()).assignErrorJournal();
return new Function<Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>>,
Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>>>() {
@Override
public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(Spec.TcpClientSpec<Message<byte[]>,
Message<byte[]>> spec) {
return spec
.codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
.env(environment)
.dispatcher(environment.getCachedDispatchers("StompClient").get())
.connect(host, port);
}
};
} }
/** /**
* Create an instance with a pre-configured TCP client. * Create an instance with a pre-configured TCP client.
*
* @param tcpClient the client to use * @param tcpClient the client to use
*/ */
public Reactor2TcpStompClient(TcpOperations<byte[]> tcpClient) { public Reactor2TcpStompClient(TcpOperations<byte[]> tcpClient) {
@ -136,8 +115,35 @@ public class Reactor2TcpStompClient extends StompClientSupport {
String dispatcherName = "StompClient"; String dispatcherName = "StompClient";
DispatcherType dispatcherType = DispatcherType.DISPATCHER_GROUP; DispatcherType dispatcherType = DispatcherType.DISPATCHER_GROUP;
DispatcherConfiguration config = new DispatcherConfiguration(dispatcherName, dispatcherType, 128, 0); DispatcherConfiguration config = new DispatcherConfiguration(dispatcherName, dispatcherType, 128, 0);
return new ReactorConfiguration(Arrays.<DispatcherConfiguration>asList(config), dispatcherName, new Properties List<DispatcherConfiguration> configList = Arrays.<DispatcherConfiguration>asList(config);
()); return new ReactorConfiguration(configList, dispatcherName, new Properties());
}
}
private static class StompTcpClientSpecFactory
implements TcpClientSpecFactory<Message<byte[]>, Message<byte[]>> {
private final Environment environment;
private final String host;
private final int port;
public StompTcpClientSpecFactory(Environment environment, String host, int port) {
this.environment = environment;
this.host = host;
this.port = port;
}
@Override
public TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(
TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
return tcpClientSpec
.codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
.env(this.environment)
.dispatcher(this.environment.getCachedDispatchers("StompClient").get())
.connect(this.host, this.port);
} }
} }

View File

@ -54,7 +54,8 @@ abstract class AbstractPromiseToListenableFutureAdapter<S, T> implements Listena
public void accept(S result) { public void accept(S result) {
try { try {
registry.success(adapt(result)); registry.success(adapt(result));
} catch (Throwable t) { }
catch (Throwable t) {
registry.failure(t); registry.failure(t);
} }
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2013 the original author or authors. * Copyright 2002-2015 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -16,12 +16,11 @@
package org.springframework.messaging.tcp.reactor; package org.springframework.messaging.tcp.reactor;
import reactor.rx.Promise; import reactor.rx.Promise;
/** /**
* A Promise-to-ListenableFutureAdapter where the source and the target from the Promise and * A Promise-to-ListenableFutureAdapter where the source and the target from
* the ListenableFuture respectively are of the same type. * the Promise and the ListenableFuture respectively are of the same type.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 4.0 * @since 4.0

View File

@ -16,13 +16,17 @@
package org.springframework.messaging.tcp.reactor; package org.springframework.messaging.tcp.reactor;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import org.springframework.messaging.Message; import reactor.Environment;
import org.springframework.messaging.tcp.ReconnectStrategy; import reactor.core.config.ConfigurationReader;
import org.springframework.messaging.tcp.TcpConnectionHandler; import reactor.core.config.DispatcherConfiguration;
import org.springframework.messaging.tcp.TcpOperations; import reactor.core.config.ReactorConfiguration;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import reactor.core.support.NamedDaemonThreadFactory; import reactor.core.support.NamedDaemonThreadFactory;
import reactor.fn.BiFunction; import reactor.fn.BiFunction;
import reactor.fn.Consumer; import reactor.fn.Consumer;
@ -34,7 +38,7 @@ import reactor.io.codec.Codec;
import reactor.io.net.ChannelStream; import reactor.io.net.ChannelStream;
import reactor.io.net.NetStreams; import reactor.io.net.NetStreams;
import reactor.io.net.Reconnect; import reactor.io.net.Reconnect;
import reactor.io.net.Spec; import reactor.io.net.Spec.TcpClientSpec;
import reactor.io.net.impl.netty.NettyClientSocketOptions; import reactor.io.net.impl.netty.NettyClientSocketOptions;
import reactor.io.net.impl.netty.tcp.NettyTcpClient; import reactor.io.net.impl.netty.tcp.NettyTcpClient;
import reactor.io.net.tcp.TcpClient; import reactor.io.net.tcp.TcpClient;
@ -43,16 +47,20 @@ import reactor.rx.Stream;
import reactor.rx.Streams; import reactor.rx.Streams;
import reactor.rx.action.Signal; import reactor.rx.action.Signal;
import java.net.InetSocketAddress; import org.springframework.messaging.Message;
import java.util.ArrayList; import org.springframework.messaging.tcp.ReconnectStrategy;
import java.util.List; import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
/** /**
* An implementation of {@link org.springframework.messaging.tcp.TcpOperations} * An implementation of {@link org.springframework.messaging.tcp.TcpOperations}
* based on the TCP client support of the Reactor project. * based on the TCP client support of the Reactor project.
* <p/> *
* This client will wrap N number of clients for N {@link #connect} calls (one client by connection). * <p>This implementation wraps N (Reactor) clients for N {@link #connect} calls,
* i.e. a separate (Reactor) client instance for each connection.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @author Stephane Maldini * @author Stephane Maldini
@ -63,172 +71,136 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
public static final Class<NettyTcpClient> REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class; public static final Class<NettyTcpClient> REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class;
private final Function<Spec.TcpClientSpec<Message<P>, Message<P>>, private final TcpClientSpecFactory<Message<P>, Message<P>> tcpClientSpecFactory;
Spec.TcpClientSpec<Message<P>, Message<P>>> tcpClientSpec;
private final List<TcpClient<Message<P>, Message<P>>> activeClients = private final List<TcpClient<Message<P>, Message<P>>> tcpClients =
new ArrayList<TcpClient<Message<P>, Message<P>>>(); new ArrayList<TcpClient<Message<P>, Message<P>>>();
/** /**
* A constructor that creates a {@link reactor.io.net.Spec.TcpClientSpec} factory with * A constructor that creates a {@link TcpClientSpec TcpClientSpec} factory
* a default {@link reactor.core.dispatch.SynchronousDispatcher} as a result of which * with a default {@link reactor.core.dispatch.SynchronousDispatcher}, i.e.
* network I/O is handled in Netty threads. Number of Netty threads can be tweaked with * relying on Netty threads. The number of Netty threads can be tweaked with
* the {@code reactor.tcp.ioThreadCount} System property. * the {@code reactor.tcp.ioThreadCount} System property. The network I/O
* <p> * threads will be shared amongst the active clients.
* The network I/O threads will be shared amongst the active clients.
* </p>
* <p/>
* <p>Also see the constructor accepting a ready Reactor * <p>Also see the constructor accepting a ready Reactor
* {@link reactor.io.net.Spec.TcpClientSpec} {@link Function} factory. * {@link TcpClientSpec} {@link Function} factory.
*
* @param host the host to connect to * @param host the host to connect to
* @param port the port to connect to * @param port the port to connect to
* @param codec the codec to use for encoding and decoding the TCP stream * @param codec the codec to use for encoding and decoding the TCP stream
*/ */
public Reactor2TcpClient(final String host, final int port, final Codec<Buffer, Message<P>, Message<P>> codec) { public Reactor2TcpClient(final String host, final int port, final Codec<Buffer, Message<P>, Message<P>> codec) {
//FIXME Should it be exposed in Spring ? final NioEventLoopGroup eventLoopGroup = initEventLoopGroup();
int ioThreadCount;
try {
ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount"));
} catch (Exception i) {
ioThreadCount = -1;
}
if (ioThreadCount <= 0l) {
ioThreadCount = Runtime.getRuntime().availableProcessors();
}
final NioEventLoopGroup eventLoopGroup = this.tcpClientSpecFactory = new TcpClientSpecFactory<Message<P>, Message<P>>() {
new NioEventLoopGroup(ioThreadCount, new NamedDaemonThreadFactory("reactor-tcp-io"));
this.tcpClientSpec = new Function<Spec.TcpClientSpec<Message<P>, Message<P>>,
Spec.TcpClientSpec<Message<P>, Message<P>>>() {
@Override @Override
public Spec.TcpClientSpec<Message<P>, Message<P>> apply(Spec.TcpClientSpec<Message<P>, Message<P>> public TcpClientSpec<Message<P>, Message<P>> apply(TcpClientSpec<Message<P>, Message<P>> spec) {
messageMessageTcpClientSpec) { return spec
return messageMessageTcpClientSpec .env(new Environment(new SynchronousDispatcherConfigReader()))
.codec(codec) .codec(codec)
//make connect dynamic or use reconnect strategy to LB onto cluster
.connect(host, port) .connect(host, port)
.options(new NettyClientSocketOptions().eventLoopGroup(eventLoopGroup)); .options(new NettyClientSocketOptions().eventLoopGroup(eventLoopGroup));
} }
}; };
} }
private NioEventLoopGroup initEventLoopGroup() {
int ioThreadCount;
try {
ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount"));
}
catch (Exception i) {
ioThreadCount = -1;
}
if (ioThreadCount <= 0l) {
ioThreadCount = Runtime.getRuntime().availableProcessors();
}
return new NioEventLoopGroup(ioThreadCount,
new NamedDaemonThreadFactory("reactor-tcp-io"));
}
/** /**
* A constructor with a pre-configured {@link reactor.io.net.Spec.TcpClientSpec} {@link Function} factory. * A constructor with a pre-configured {@link TcpClientSpec} {@link Function}
* This might be used to add SSL or specific network parameters to the generated client configuration. * factory. This might be used to add SSL or specific network parameters to
* <p/> * the generated client configuration.
* <p><strong>NOTE:</strong> if the client is configured with a thread-creating * <p><strong>NOTE:</strong> if the client is configured with a thread-creating
* dispatcher, you are responsible for cleaning them, e.g. using {@link reactor.core.Dispatcher#shutdown}. * dispatcher, you are responsible for cleaning them, e.g. using
* {@link reactor.core.Dispatcher#shutdown}.
* *
* @param tcpClientSpecFactory the TcpClientSpec {@link Function} to use for each client creation. * @param tcpClientSpecFactory the TcpClientSpec {@link Function} to use for each client creation.
*/ */
public Reactor2TcpClient(Function<Spec.TcpClientSpec<Message<P>, Message<P>>, public Reactor2TcpClient(TcpClientSpecFactory<Message<P>, Message<P>> tcpClientSpecFactory) {
Spec.TcpClientSpec<Message<P>, Message<P>>> tcpClientSpecFactory) {
Assert.notNull(tcpClientSpecFactory, "'tcpClientClientFactory' must not be null"); Assert.notNull(tcpClientSpecFactory, "'tcpClientClientFactory' must not be null");
this.tcpClientSpec = tcpClientSpecFactory; this.tcpClientSpecFactory = tcpClientSpecFactory;
} }
@Override @Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler) { public ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler) {
Class<NettyTcpClient> type = REACTOR_TCP_CLIENT_TYPE;
//create a new client TcpClient<Message<P>, Message<P>> tcpClient = NetStreams.tcpClient(type, this.tcpClientSpecFactory);
TcpClient<Message<P>, Message<P>> tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, tcpClientSpec);
//attach connection handler
composeConnectionHandling(tcpClient, connectionHandler); composeConnectionHandling(tcpClient, connectionHandler);
//open connection
Promise<Boolean> promise = tcpClient.open(); Promise<Boolean> promise = tcpClient.open();
return new BooleanToVoidAdapter(promise);
//adapt to ListenableFuture
return new AbstractPromiseToListenableFutureAdapter<Boolean, Void>(promise) {
@Override
protected Void adapt(Boolean result) {
return null;
}
};
} }
@Override @Override
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> connectionHandler, public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
final ReconnectStrategy reconnectStrategy) { Assert.notNull(strategy, "ReconnectStrategy must not be null");
Class<NettyTcpClient> type = REACTOR_TCP_CLIENT_TYPE;
Assert.notNull(reconnectStrategy, "ReconnectStrategy must not be null"); TcpClient<Message<P>, Message<P>> tcpClient = NetStreams.tcpClient(type, this.tcpClientSpecFactory);
composeConnectionHandling(tcpClient, handler);
Reconnect reconnect = new Reconnect() { Stream<Boolean> stream = tcpClient.open(new ReactorRectonnectAdapter(strategy));
@Override return new BooleanToVoidAdapter(stream.next());
public Tuple2<InetSocketAddress, Long> reconnect(InetSocketAddress address, int attempt) {
return Tuple.of(address, reconnectStrategy.getTimeToNextAttempt(attempt));
}
};
//create a new client
TcpClient<Message<P>, Message<P>> tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, tcpClientSpec);
//attach connection handler
composeConnectionHandling(tcpClient, connectionHandler);
//open connection
Stream<Boolean> stream = tcpClient.open(reconnect);
//adapt to ListenableFuture with the next available connection
Promise<Void> promise = stream.next().map(
new Function<Boolean, Void>() {
@Override
public Void apply(Boolean ch) {
return null;
}
});
return new PassThroughPromiseToListenableFutureAdapter<Void>(promise);
} }
private void composeConnectionHandling(final TcpClient<Message<P>, Message<P>> tcpClient, private void composeConnectionHandling(final TcpClient<Message<P>, Message<P>> tcpClient,
final TcpConnectionHandler<P> connectionHandler) { final TcpConnectionHandler<P> connectionHandler) {
synchronized (activeClients){ synchronized (this.tcpClients){
activeClients.add(tcpClient); this.tcpClients.add(tcpClient);
} }
tcpClient tcpClient
.finallyDo(new Consumer<Signal<ChannelStream<Message<P>,Message<P>>>>() { .finallyDo(new Consumer<Signal<ChannelStream<Message<P>, Message<P>>>>() {
@Override @Override
public void accept(Signal<ChannelStream<Message<P>,Message<P>>> signal) { public void accept(Signal<ChannelStream<Message<P>, Message<P>>> signal) {
synchronized (activeClients) { synchronized (tcpClients) {
activeClients.remove(tcpClient); tcpClients.remove(tcpClient);
} }
if(signal.isOnError()) { if (signal.isOnError()) {
connectionHandler.afterConnectFailure(signal.getThrowable()); connectionHandler.afterConnectFailure(signal.getThrowable());
} }
} }
}) })
.log("reactor.client")
.consume(new Consumer<ChannelStream<Message<P>, Message<P>>>() { .consume(new Consumer<ChannelStream<Message<P>, Message<P>>>() {
@Override @Override
public void accept(ChannelStream<Message<P>, Message<P>> connection) { public void accept(ChannelStream<Message<P>, Message<P>> connection) {
connection connection
.log("reactor.connection")
.finallyDo(new Consumer<Signal<Message<P>>>() { .finallyDo(new Consumer<Signal<Message<P>>>() {
@Override @Override
public void accept(Signal<Message<P>> signal) { public void accept(Signal<Message<P>> signal) {
if (signal.isOnError()) { if (signal.isOnError()) {
connectionHandler.handleFailure(signal.getThrowable()); connectionHandler.handleFailure(signal.getThrowable());
} else if (signal.isOnComplete()) { }
else if (signal.isOnComplete()) {
connectionHandler.afterConnectionClosed(); connectionHandler.afterConnectionClosed();
} }
} }
}) })
.consume(new Consumer<Message<P>>() { .consume(new Consumer<Message<P>>() {
@Override @Override
public void accept(Message<P> message) { public void accept(Message<P> message) {
connectionHandler.handleMessage(message); connectionHandler.handleMessage(message);
} }
}); });
connectionHandler.afterConnected(new Reactor2TcpConnection<P>(connection)); connectionHandler.afterConnected(new Reactor2TcpConnection<P>(connection));
} }
}); });
@ -236,12 +208,11 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@Override @Override
public ListenableFuture<Boolean> shutdown() { public ListenableFuture<Boolean> shutdown() {
final List<TcpClient<Message<P>, Message<P>>> clients; final List<TcpClient<Message<P>, Message<P>>> clients;
synchronized (activeClients){ synchronized (this.tcpClients) {
clients = new ArrayList<TcpClient<Message<P>, Message<P>>>(activeClients); clients = new ArrayList<TcpClient<Message<P>, Message<P>>>(this.tcpClients);
//should be cleared individually in tcpClient#finallyDo()
//activeClients.clear();
} }
Promise<Boolean> promise = Streams.from(clients) Promise<Boolean> promise = Streams.from(clients)
@ -259,11 +230,45 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
}) })
.next(); .next();
return new AbstractPromiseToListenableFutureAdapter<Boolean, Boolean>(promise) { return new PassThroughPromiseToListenableFutureAdapter<Boolean>(promise);
@Override
protected Boolean adapt(Boolean result) {
return result;
}
};
} }
public interface TcpClientSpecFactory<I, O> extends Function<TcpClientSpec<I, O>, TcpClientSpec<I, O>> {
}
private static class SynchronousDispatcherConfigReader implements ConfigurationReader {
@Override
public ReactorConfiguration read() {
return new ReactorConfiguration(Arrays.<DispatcherConfiguration>asList(), "sync", new Properties());
}
}
private static class ReactorRectonnectAdapter implements Reconnect {
private final ReconnectStrategy strategy;
public ReactorRectonnectAdapter(ReconnectStrategy strategy) {
this.strategy = strategy;
}
@Override
public Tuple2<InetSocketAddress, Long> reconnect(InetSocketAddress address, int attempt) {
return Tuple.of(address, strategy.getTimeToNextAttempt(attempt));
}
}
private static class BooleanToVoidAdapter extends AbstractPromiseToListenableFutureAdapter<Boolean, Void> {
public BooleanToVoidAdapter(Promise<Boolean> promise) {
super(promise);
}
@Override
protected Void adapt(Boolean result) {
return null;
}
}
} }

View File

@ -16,64 +16,57 @@
package org.springframework.messaging.tcp.reactor; package org.springframework.messaging.tcp.reactor;
import org.reactivestreams.Publisher; import reactor.fn.Functions;
import reactor.io.net.ChannelStream;
import reactor.rx.Promises;
import reactor.rx.broadcast.Broadcaster;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.TcpConnection; import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFuture;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.fn.Functions;
import reactor.io.net.ChannelStream;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.Stream;
import reactor.rx.broadcast.Broadcaster;
import java.lang.reflect.Constructor;
/** /**
* An implementation of {@link org.springframework.messaging.tcp.TcpConnection} * An implementation of {@link org.springframework.messaging.tcp.TcpConnection
* based on the TCP client support of the Reactor project. * TcpConnection} based on the TCP client support of the Reactor project.
*
* @param <P> the payload type of messages read or written to the TCP stream.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 4.0 * @since 4.2
* @param <P> the payload type of Spring Message's read from and written to
* the TCP stream
*/ */
public class Reactor2TcpConnection<P> implements TcpConnection<P> { public class Reactor2TcpConnection<P> implements TcpConnection<P> {
private final ChannelStream<Message<P>, Message<P>> channel; private final ChannelStream<Message<P>, Message<P>> channelStream;
private final Broadcaster<Message<P>> sink;
private final Broadcaster<Message<P>> sink;
public Reactor2TcpConnection(ChannelStream<Message<P>, Message<P>> connection) { public Reactor2TcpConnection(ChannelStream<Message<P>, Message<P>> channelStream) {
this.channel = connection; this.channelStream = channelStream;
this.sink = Broadcaster.create(); this.sink = Broadcaster.create();
this.channelStream.sink(this.sink);
channel.sink(sink);
} }
@Override @Override
public ListenableFuture<Void> send(Message<P> message) { public ListenableFuture<Void> send(Message<P> message) {
sink.onNext(message); this.sink.onNext(message);
//FIXME need to align Reactor with Reactive IPC to have publish/confirm receipt
return new PassThroughPromiseToListenableFutureAdapter<Void>(Promises.<Void>success(null)); return new PassThroughPromiseToListenableFutureAdapter<Void>(Promises.<Void>success(null));
} }
@Override @Override
public void onReadInactivity(Runnable runnable, long inactivityDuration) { public void onReadInactivity(Runnable runnable, long inactivityDuration) {
this.channel.on().readIdle(inactivityDuration, Functions.<Void>consumer(runnable)); this.channelStream.on().readIdle(inactivityDuration, Functions.<Void>consumer(runnable));
} }
@Override @Override
public void onWriteInactivity(Runnable runnable, long inactivityDuration) { public void onWriteInactivity(Runnable runnable, long inactivityDuration) {
this.channel.on().writeIdle(inactivityDuration, Functions.<Void>consumer(runnable)); this.channelStream.on().writeIdle(inactivityDuration, Functions.<Void>consumer(runnable));
} }
@Override @Override
public void close() { public void close() {
sink.onComplete(); this.sink.onComplete();
} }
} }