Upgrade to reactor 1.0.1.RELEASE

This commit is contained in:
Rossen Stoyanchev 2014-03-17 17:10:53 -04:00
parent 92eb99a5ab
commit 918e21fd8f
2 changed files with 43 additions and 14 deletions

View File

@ -390,8 +390,8 @@ project("spring-messaging") {
compile(project(":spring-beans"))
compile(project(":spring-core"))
compile(project(":spring-context"))
optional("org.projectreactor:reactor-core:1.0.0.RELEASE")
optional("org.projectreactor:reactor-tcp:1.0.0.RELEASE")
optional("org.projectreactor:reactor-core:1.0.1.RELEASE")
optional("org.projectreactor:reactor-tcp:1.0.1.RELEASE")
optional("org.eclipse.jetty.websocket:websocket-server:${jettyVersion}") {
exclude group: "javax.servlet", module: "javax.servlet-api"
}
@ -607,8 +607,8 @@ project("spring-websocket") {
testCompile("org.apache.tomcat.embed:tomcat-embed-core:8.0.3")
testCompile("org.apache.tomcat.embed:tomcat-embed-websocket:8.0.3")
testCompile("org.apache.tomcat.embed:tomcat-embed-logging-juli:8.0.3")
testCompile("org.projectreactor:reactor-core:1.0.0.RELEASE")
testCompile("org.projectreactor:reactor-tcp:1.0.0.RELEASE")
testCompile("org.projectreactor:reactor-core:1.0.1.RELEASE")
testCompile("org.projectreactor:reactor-tcp:1.0.1.RELEASE")
testCompile("log4j:log4j:1.2.17")
testCompile("org.slf4j:slf4j-jcl:${slf4jVersion}")
}

View File

@ -16,6 +16,7 @@
package org.springframework.messaging.tcp.reactor;
import java.lang.reflect.Modifier;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
@ -48,7 +49,7 @@ import reactor.tuple.Tuple2;
/**
* An implementation of {@link org.springframework.messaging.tcp.TcpOperations}
* based on the Reactor project.
* based on the TCP client support of the Reactor project.
*
* @author Rossen Stoyanchev
* @since 4.0
@ -60,7 +61,9 @@ public class ReactorTcpClient<P> implements TcpOperations<P> {
private final static Log logger = LogFactory.getLog(ReactorTcpClient.class);
private TcpClient<Message<P>, Message<P>> tcpClient;
private final TcpClient<Message<P>, Message<P>> tcpClient;
private final Environment environment;
/**
@ -76,12 +79,18 @@ public class ReactorTcpClient<P> implements TcpOperations<P> {
* @param codec the codec to use for encoding and decoding the TCP stream
*/
public ReactorTcpClient(String host, int port, Codec<Buffer, Message<P>, Message<P>> codec) {
// Revisit in 1.1: is Environment still required w/ sync dispatcher?
this.environment = new Environment();
this.tcpClient = new TcpClientSpec<Message<P>, Message<P>>(REACTOR_TCP_CLIENT_TYPE)
.env(new Environment())
.env(this.environment)
.codec(codec)
.connect(host, port)
.synchronousDispatcher()
.get();
checkReactorVersion();
}
/**
@ -96,6 +105,20 @@ public class ReactorTcpClient<P> implements TcpOperations<P> {
public ReactorTcpClient(TcpClient<Message<P>, Message<P>> tcpClient) {
Assert.notNull(tcpClient, "'tcpClient' must not be null");
this.tcpClient = tcpClient;
this.environment = null;
checkReactorVersion();
}
private static void checkReactorVersion() {
Class<?> type = null;
try {
type = ReactorTcpClient.class.getClassLoader().loadClass("reactor.event.dispatch.BaseDispatcher");
Assert.isTrue(Modifier.isPublic(type.getModifiers()),
"Detected older version of reactor-tcp. Switch to 1.0.1.RELEASE or higher.");
}
catch (ClassNotFoundException e) {
// Ignore, must be 1.1+
}
}
@ -190,13 +213,19 @@ public class ReactorTcpClient<P> implements TcpOperations<P> {
@Override
public ListenableFuture<Void> shutdown() {
Promise<Void> promise = this.tcpClient.close();
return new AbstractPromiseToListenableFutureAdapter<Void, Void>(promise) {
@Override
protected Void adapt(Void result) {
return result;
}
};
try {
Promise<Void> promise = this.tcpClient.close();
return new AbstractPromiseToListenableFutureAdapter<Void, Void>(promise) {
@Override
protected Void adapt(Void result) {
return result;
}
};
}
finally {
this.environment.shutdown();
}
}
}