Upgrade to Reactor Netty 0.8

Issue: SPR-16387
This commit is contained in:
Violeta Georgieva 2018-05-17 21:47:18 +03:00 committed by Rossen Stoyanchev
parent 61ffbe5554
commit ffbc75ae47
20 changed files with 144 additions and 178 deletions

View File

@ -18,7 +18,7 @@ dependencies {
compile(project(":spring-core")) compile(project(":spring-core"))
optional(project(":spring-context")) optional(project(":spring-context"))
optional(project(":spring-oxm")) optional(project(":spring-oxm"))
optional("io.projectreactor.ipc:reactor-netty") optional("io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT")
optional("org.eclipse.jetty.websocket:websocket-server:${jettyVersion}") { optional("org.eclipse.jetty.websocket:websocket-server:${jettyVersion}") {
exclude group: "javax.servlet", module: "javax.servlet-api" exclude group: "javax.servlet", module: "javax.servlet-api"
} }

View File

@ -39,14 +39,13 @@ import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
import reactor.ipc.netty.FutureMono; import reactor.netty.Connection;
import reactor.ipc.netty.NettyContext; import reactor.netty.FutureMono;
import reactor.ipc.netty.NettyInbound; import reactor.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound; import reactor.netty.NettyOutbound;
import reactor.ipc.netty.options.ClientOptions; import reactor.netty.resources.ConnectionProvider;
import reactor.ipc.netty.resources.LoopResources; import reactor.netty.resources.LoopResources;
import reactor.ipc.netty.resources.PoolResources; import reactor.netty.tcp.TcpClient;
import reactor.ipc.netty.tcp.TcpClient;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
@ -83,7 +82,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
private LoopResources loopResources; private LoopResources loopResources;
@Nullable @Nullable
private PoolResources poolResources; private ConnectionProvider poolResources;
private final Scheduler scheduler = Schedulers.newParallel("tcp-client-scheduler"); private final Scheduler scheduler = Schedulers.newParallel("tcp-client-scheduler");
@ -98,57 +97,18 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec * @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
*/ */
public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec<P> codec) { public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec<P> codec) {
this(builder -> builder.host(host).port(port), codec); Assert.notNull(host, "host is required");
} Assert.notNull(port, "port is required");
/**
* Constructor with a {@link ClientOptions.Builder} that can be used to
* customize Reactor Netty client options.
*
* <p><strong>Note: </strong> this constructor manages the lifecycle of the
* {@link TcpClient} and its underlying resources. Please do not customize
* any of the following options:
* {@link ClientOptions.Builder#channelGroup(ChannelGroup) ChannelGroup},
* {@link ClientOptions.Builder#loopResources(LoopResources) LoopResources}, and
* {@link ClientOptions.Builder#poolResources(PoolResources) PoolResources}.
* You may set the {@link ClientOptions.Builder#disablePool() disablePool}
* option if you simply want to turn off pooling.
*
* <p>For full control over the initialization and lifecycle of the TcpClient,
* see {@link #ReactorNettyTcpClient(TcpClient, ReactorNettyCodec)}.
*
* @param optionsConsumer consumer to customize client options
* @param codec the code to use
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
*/
public ReactorNettyTcpClient(Consumer<ClientOptions.Builder<?>> optionsConsumer,
ReactorNettyCodec<P> codec) {
Assert.notNull(optionsConsumer, "Consumer<ClientOptions.Builder<?> is required");
Assert.notNull(codec, "ReactorNettyCodec is required"); Assert.notNull(codec, "ReactorNettyCodec is required");
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
Consumer<ClientOptions.Builder<?>> builtInConsumer = builder -> {
Assert.isTrue(!builder.isLoopAvailable() && !builder.isPoolAvailable(),
"The provided ClientOptions.Builder contains LoopResources and/or PoolResources. " +
"Please, use the constructor that accepts a TcpClient instance " +
"for full control over initialization and lifecycle.");
builder.channelGroup(this.channelGroup);
builder.preferNative(false);
this.loopResources = LoopResources.create("tcp-client-loop"); this.loopResources = LoopResources.create("tcp-client-loop");
builder.loopResources(this.loopResources); this.poolResources = ConnectionProvider.elastic("tcp-client-pool");
this.tcpClient = TcpClient.create(poolResources)
if (!builder.isPoolDisabled()) { .host(host)
this.poolResources = PoolResources.elastic("tcp-client-pool"); .port(port)
builder.poolResources(this.poolResources); .runOn(loopResources, false)
} .doOnConnected(c -> channelGroup.add(c.channel()));
};
this.tcpClient = TcpClient.create(optionsConsumer.andThen(builtInConsumer));
this.codec = codec; this.codec = codec;
} }
@ -181,7 +141,8 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
} }
Mono<Void> connectMono = this.tcpClient Mono<Void> connectMono = this.tcpClient
.newHandler(new ReactorNettyHandler(handler)) .handle(new ReactorNettyHandler(handler))
.connect()
.doOnError(handler::afterConnectFailure) .doOnError(handler::afterConnectFailure)
.then(); .then();
@ -201,11 +162,12 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
MonoProcessor<Void> connectMono = MonoProcessor.create(); MonoProcessor<Void> connectMono = MonoProcessor.create();
this.tcpClient this.tcpClient
.newHandler(new ReactorNettyHandler(handler)) .handle(new ReactorNettyHandler(handler))
.connect()
.doOnNext(updateConnectMono(connectMono)) .doOnNext(updateConnectMono(connectMono))
.doOnError(updateConnectMono(connectMono)) .doOnError(updateConnectMono(connectMono))
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler .doOnError(handler::afterConnectFailure) // report all connect failures to the handler
.flatMap(NettyContext::onClose) // post-connect issues .flatMap(Connection::onDispose) // post-connect issues
.retryWhen(reconnectFunction(strategy)) .retryWhen(reconnectFunction(strategy))
.repeatWhen(reconnectFunction(strategy)) .repeatWhen(reconnectFunction(strategy))
.subscribe(); .subscribe();
@ -302,14 +264,16 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) { public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
inbound.withConnection(c -> {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Connected to " + inbound.remoteAddress()); logger.debug("Connected to " + c.address());
} }
});
DirectProcessor<Void> completion = DirectProcessor.create(); DirectProcessor<Void> completion = DirectProcessor.create();
TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion); TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion);
scheduler.schedule(() -> connectionHandler.afterConnected(connection)); scheduler.schedule(() -> connectionHandler.afterConnected(connection));
inbound.context().addHandler(new StompMessageDecoder<>(codec)); inbound.withConnection(c -> c.addHandler(new StompMessageDecoder<>(codec)));
inbound.receiveObject() inbound.receiveObject()
.cast(Message.class) .cast(Message.class)

View File

@ -17,12 +17,10 @@
package org.springframework.messaging.tcp.reactor; package org.springframework.messaging.tcp.reactor;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelPipeline;
import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyInbound; import reactor.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound; import reactor.netty.NettyOutbound;
import reactor.ipc.netty.NettyPipeline;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.TcpConnection; import org.springframework.messaging.tcp.TcpConnection;
@ -66,20 +64,13 @@ public class ReactorNettyTcpConnection<P> implements TcpConnection<P> {
@Override @Override
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public void onReadInactivity(Runnable runnable, long inactivityDuration) { public void onReadInactivity(Runnable runnable, long inactivityDuration) {
// TODO: workaround for https://github.com/reactor/reactor-netty/issues/22 this.inbound.withConnection(c -> c.onReadIdle(inactivityDuration, runnable));
ChannelPipeline pipeline = this.inbound.context().channel().pipeline();
String name = NettyPipeline.OnChannelReadIdle;
if (pipeline.context(name) != null) {
pipeline.remove(name);
}
this.inbound.onReadIdle(inactivityDuration, runnable);
} }
@Override @Override
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public void onWriteInactivity(Runnable runnable, long inactivityDuration) { public void onWriteInactivity(Runnable runnable, long inactivityDuration) {
this.outbound.onWriteIdle(inactivityDuration, runnable); this.inbound.withConnection(c -> c.onWriteIdle(inactivityDuration, runnable));
} }
@Override @Override

View File

@ -80,7 +80,7 @@ dependencies {
testCompile("org.apache.httpcomponents:httpclient:4.5.5") { testCompile("org.apache.httpcomponents:httpclient:4.5.5") {
exclude group: "commons-logging", module: "commons-logging" exclude group: "commons-logging", module: "commons-logging"
} }
testCompile('io.projectreactor.ipc:reactor-netty') testCompile('io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT')
testCompile('de.bechte.junit:junit-hierarchicalcontextrunner:4.12.1') testCompile('de.bechte.junit:junit-hierarchicalcontextrunner:4.12.1')
// Pull in the latest JUnit 5 Launcher API and the Vintage engine as well // Pull in the latest JUnit 5 Launcher API and the Vintage engine as well
// so that we can run JUnit 4 tests in IntelliJ IDEA. // so that we can run JUnit 4 tests in IntelliJ IDEA.

View File

@ -34,7 +34,7 @@ dependencies {
optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}") optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}")
optional("io.reactivex.rxjava2:rxjava:${rxjava2Version}") optional("io.reactivex.rxjava2:rxjava:${rxjava2Version}")
optional("io.netty:netty-all") optional("io.netty:netty-all")
optional("io.projectreactor.ipc:reactor-netty") optional("io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT")
optional("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}") optional("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}")
optional("org.eclipse.jetty:jetty-server:${jettyVersion}") { optional("org.eclipse.jetty:jetty-server:${jettyVersion}") {
exclude group: "javax.servlet", module: "javax.servlet-api" exclude group: "javax.servlet", module: "javax.servlet-api"

View File

@ -17,24 +17,25 @@
package org.springframework.http.client.reactive; package org.springframework.http.client.reactive;
import java.net.URI; import java.net.URI;
import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.ipc.netty.http.client.HttpClient; import reactor.netty.NettyInbound;
import reactor.ipc.netty.http.client.HttpClientOptions; import reactor.netty.NettyOutbound;
import reactor.ipc.netty.http.client.HttpClientRequest; import reactor.netty.http.client.HttpClient;
import reactor.ipc.netty.http.client.HttpClientResponse; import reactor.netty.http.client.HttpClientRequest;
import reactor.ipc.netty.options.ClientOptions; import reactor.netty.http.client.HttpClientResponse;
import org.springframework.http.HttpMethod; import org.springframework.http.HttpMethod;
import io.netty.buffer.ByteBufAllocator;
/** /**
* Reactor-Netty implementation of {@link ClientHttpConnector}. * Reactor-Netty implementation of {@link ClientHttpConnector}.
* *
* @author Brian Clozel * @author Brian Clozel
* @since 5.0 * @since 5.0
* @see reactor.ipc.netty.http.client.HttpClient * @see reactor.netty.http.client.HttpClient
*/ */
public class ReactorClientHttpConnector implements ClientHttpConnector { public class ReactorClientHttpConnector implements ClientHttpConnector {
@ -43,20 +44,19 @@ public class ReactorClientHttpConnector implements ClientHttpConnector {
/** /**
* Create a Reactor Netty {@link ClientHttpConnector} * Create a Reactor Netty {@link ClientHttpConnector}
* with default {@link ClientOptions} and HTTP compression support enabled. * with a default configuration and HTTP compression support enabled.
*/ */
public ReactorClientHttpConnector() { public ReactorClientHttpConnector() {
this.httpClient = HttpClient.builder() this.httpClient = HttpClient.create()
.options(options -> options.compression(true)) .compress();
.build();
} }
/** /**
* Create a Reactor Netty {@link ClientHttpConnector} with the given * Create a Reactor Netty {@link ClientHttpConnector} with the given
* {@link HttpClientOptions.Builder} * {@link HttpClient}
*/ */
public ReactorClientHttpConnector(Consumer<? super HttpClientOptions.Builder> clientOptions) { public ReactorClientHttpConnector(HttpClient httpClient) {
this.httpClient = HttpClient.create(clientOptions); this.httpClient = httpClient;
} }
@ -69,22 +69,24 @@ public class ReactorClientHttpConnector implements ClientHttpConnector {
} }
return this.httpClient return this.httpClient
.request(adaptHttpMethod(method), .request(adaptHttpMethod(method))
uri.toString(), .uri(uri.toString())
request -> requestCallback.apply(adaptRequest(method, uri, request))) .send((req, out) -> requestCallback.apply(adaptRequest(method, uri, req, out)))
.map(this::adaptResponse); .responseConnection((res, con) -> Mono.just(adaptResponse(res, con.inbound(), con.outbound().alloc())))
.next();
} }
private io.netty.handler.codec.http.HttpMethod adaptHttpMethod(HttpMethod method) { private io.netty.handler.codec.http.HttpMethod adaptHttpMethod(HttpMethod method) {
return io.netty.handler.codec.http.HttpMethod.valueOf(method.name()); return io.netty.handler.codec.http.HttpMethod.valueOf(method.name());
} }
private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request) { private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request, NettyOutbound out) {
return new ReactorClientHttpRequest(method, uri, request); return new ReactorClientHttpRequest(method, uri, request, out);
} }
private ClientHttpResponse adaptResponse(HttpClientResponse response) { private ClientHttpResponse adaptResponse(HttpClientResponse response, NettyInbound nettyInbound,
return new ReactorClientHttpResponse(response); ByteBufAllocator alloc) {
return new ReactorClientHttpResponse(response, nettyInbound, alloc);
} }
} }

View File

@ -25,7 +25,8 @@ import io.netty.handler.codec.http.cookie.DefaultCookie;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.ipc.netty.http.client.HttpClientRequest; import reactor.netty.NettyOutbound;
import reactor.netty.http.client.HttpClientRequest;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
@ -38,7 +39,7 @@ import org.springframework.http.ZeroCopyHttpOutputMessage;
* *
* @author Brian Clozel * @author Brian Clozel
* @since 5.0 * @since 5.0
* @see reactor.ipc.netty.http.client.HttpClient * @see reactor.netty.http.client.HttpClient
*/ */
class ReactorClientHttpRequest extends AbstractClientHttpRequest implements ZeroCopyHttpOutputMessage { class ReactorClientHttpRequest extends AbstractClientHttpRequest implements ZeroCopyHttpOutputMessage {
@ -48,15 +49,18 @@ class ReactorClientHttpRequest extends AbstractClientHttpRequest implements Zero
private final HttpClientRequest httpRequest; private final HttpClientRequest httpRequest;
private final NettyOutbound out;
private final NettyDataBufferFactory bufferFactory; private final NettyDataBufferFactory bufferFactory;
public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri, public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri,
HttpClientRequest httpRequest) { HttpClientRequest httpRequest, NettyOutbound out) {
this.httpMethod = httpMethod; this.httpMethod = httpMethod;
this.uri = uri; this.uri = uri;
this.httpRequest = httpRequest.failOnClientError(false).failOnServerError(false); this.httpRequest = httpRequest;
this.bufferFactory = new NettyDataBufferFactory(httpRequest.alloc()); this.out = out;
this.bufferFactory = new NettyDataBufferFactory(out.alloc());
} }
@ -77,14 +81,14 @@ class ReactorClientHttpRequest extends AbstractClientHttpRequest implements Zero
@Override @Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) { public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
return doCommit(() -> this.httpRequest return doCommit(() -> this.out
.send(Flux.from(body).map(NettyDataBufferFactory::toByteBuf)).then()); .send(Flux.from(body).map(NettyDataBufferFactory::toByteBuf)).then());
} }
@Override @Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) { public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
Publisher<Publisher<ByteBuf>> byteBufs = Flux.from(body).map(ReactorClientHttpRequest::toByteBufs); Publisher<Publisher<ByteBuf>> byteBufs = Flux.from(body).map(ReactorClientHttpRequest::toByteBufs);
return doCommit(() -> this.httpRequest.sendGroups(byteBufs).then()); return doCommit(() -> this.out.sendGroups(byteBufs).then());
} }
private static Publisher<ByteBuf> toByteBufs(Publisher<? extends DataBuffer> dataBuffers) { private static Publisher<ByteBuf> toByteBufs(Publisher<? extends DataBuffer> dataBuffers) {
@ -93,12 +97,12 @@ class ReactorClientHttpRequest extends AbstractClientHttpRequest implements Zero
@Override @Override
public Mono<Void> writeWith(File file, long position, long count) { public Mono<Void> writeWith(File file, long position, long count) {
return doCommit(() -> this.httpRequest.sendFile(file.toPath(), position, count).then()); return doCommit(() -> this.out.sendFile(file.toPath(), position, count).then());
} }
@Override @Override
public Mono<Void> setComplete() { public Mono<Void> setComplete() {
return doCommit(() -> httpRequest.sendHeaders().then()); return doCommit(() -> out.then());
} }
@Override @Override

View File

@ -19,7 +19,8 @@ package org.springframework.http.client.reactive;
import java.util.Collection; import java.util.Collection;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.ipc.netty.http.client.HttpClientResponse; import reactor.netty.NettyInbound;
import reactor.netty.http.client.HttpClientResponse;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.core.io.buffer.NettyDataBufferFactory;
@ -30,12 +31,14 @@ import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap; import org.springframework.util.MultiValueMap;
import io.netty.buffer.ByteBufAllocator;
/** /**
* {@link ClientHttpResponse} implementation for the Reactor-Netty HTTP client. * {@link ClientHttpResponse} implementation for the Reactor-Netty HTTP client.
* *
* @author Brian Clozel * @author Brian Clozel
* @since 5.0 * @since 5.0
* @see reactor.ipc.netty.http.client.HttpClient * @see reactor.netty.http.client.HttpClient
*/ */
class ReactorClientHttpResponse implements ClientHttpResponse { class ReactorClientHttpResponse implements ClientHttpResponse {
@ -43,16 +46,21 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
private final HttpClientResponse response; private final HttpClientResponse response;
private final NettyInbound nettyInbound;
public ReactorClientHttpResponse(HttpClientResponse response) {
public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound nettyInbound,
ByteBufAllocator alloc) {
this.response = response; this.response = response;
this.dataBufferFactory = new NettyDataBufferFactory(response.channel().alloc()); this.nettyInbound = nettyInbound;
this.dataBufferFactory = new NettyDataBufferFactory(alloc);
} }
@Override @Override
public Flux<DataBuffer> getBody() { public Flux<DataBuffer> getBody() {
return response.receive() return nettyInbound
.receive()
.map(buf -> { .map(buf -> {
buf.retain(); buf.retain();
return dataBufferFactory.wrap(buf); return dataBufferFactory.wrap(buf);

View File

@ -23,8 +23,8 @@ import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.ipc.netty.http.server.HttpServerRequest; import reactor.netty.http.server.HttpServerRequest;
import reactor.ipc.netty.http.server.HttpServerResponse; import reactor.netty.http.server.HttpServerResponse;
import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpMethod; import org.springframework.http.HttpMethod;

View File

@ -21,12 +21,12 @@ import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSession;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.ipc.netty.http.server.HttpServerRequest; import reactor.netty.Connection;
import reactor.netty.http.server.HttpServerRequest;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.core.io.buffer.NettyDataBufferFactory;
@ -90,16 +90,14 @@ class ReactorServerHttpRequest extends AbstractServerHttpRequest {
} }
} }
else { else {
InetSocketAddress localAddress = (InetSocketAddress) request.context().channel().localAddress(); InetSocketAddress localAddress = request.hostAddress();
return new URI(scheme, null, localAddress.getHostString(), return new URI(scheme, null, localAddress.getHostString(),
localAddress.getPort(), null, null, null); localAddress.getPort(), null, null, null);
} }
} }
private static String getScheme(HttpServerRequest request) { private static String getScheme(HttpServerRequest request) {
ChannelPipeline pipeline = request.context().channel().pipeline(); return request.scheme();
boolean ssl = pipeline.get(SslHandler.class) != null;
return ssl ? "https" : "http";
} }
private static String resolveRequestUri(HttpServerRequest request) { private static String resolveRequestUri(HttpServerRequest request) {
@ -157,7 +155,7 @@ class ReactorServerHttpRequest extends AbstractServerHttpRequest {
@Nullable @Nullable
protected SslInfo initSslInfo() { protected SslInfo initSslInfo() {
SslHandler sslHandler = this.request.context().channel().pipeline().get(SslHandler.class); SslHandler sslHandler = ((Connection) this.request).channel().pipeline().get(SslHandler.class);
if (sslHandler != null) { if (sslHandler != null) {
SSLSession session = sslHandler.engine().getSession(); SSLSession session = sslHandler.engine().getSession();
return new DefaultSslInfo(session); return new DefaultSslInfo(session);

View File

@ -25,7 +25,7 @@ import io.netty.handler.codec.http.cookie.DefaultCookie;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.ipc.netty.http.server.HttpServerResponse; import reactor.netty.http.server.HttpServerResponse;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;

View File

@ -18,7 +18,7 @@ package org.springframework.http.server.reactive.bootstrap;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import reactor.ipc.netty.NettyContext; import reactor.netty.DisposableServer;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter; import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
@ -29,15 +29,17 @@ public class ReactorHttpServer extends AbstractHttpServer {
private ReactorHttpHandlerAdapter reactorHandler; private ReactorHttpHandlerAdapter reactorHandler;
private reactor.ipc.netty.http.server.HttpServer reactorServer; private reactor.netty.http.server.HttpServer reactorServer;
private AtomicReference<NettyContext> nettyContext = new AtomicReference<>(); private AtomicReference<DisposableServer> disposableServer = new AtomicReference<>();
@Override @Override
protected void initServer() throws Exception { protected void initServer() throws Exception {
this.reactorHandler = createHttpHandlerAdapter(); this.reactorHandler = createHttpHandlerAdapter();
this.reactorServer = reactor.ipc.netty.http.server.HttpServer.create(getHost(), getPort()); this.reactorServer = reactor.netty.http.server.HttpServer.create()
.tcpConfiguration(tcpServer -> tcpServer.host(getHost()))
.port(getPort());
} }
private ReactorHttpHandlerAdapter createHttpHandlerAdapter() { private ReactorHttpHandlerAdapter createHttpHandlerAdapter() {
@ -46,21 +48,21 @@ public class ReactorHttpServer extends AbstractHttpServer {
@Override @Override
protected void startInternal() { protected void startInternal() {
NettyContext nettyContext = this.reactorServer.newHandler(this.reactorHandler).block(); DisposableServer disposableServer = this.reactorServer.handle(this.reactorHandler).bind().block();
setPort(nettyContext.address().getPort()); setPort(disposableServer.address().getPort());
this.nettyContext.set(nettyContext); this.disposableServer.set(disposableServer);
} }
@Override @Override
protected void stopInternal() { protected void stopInternal() {
this.nettyContext.get().dispose(); this.disposableServer.get().dispose();
} }
@Override @Override
protected void resetInternal() { protected void resetInternal() {
this.reactorServer = null; this.reactorServer = null;
this.reactorHandler = null; this.reactorHandler = null;
this.nettyContext.set(null); this.disposableServer.set(null);
} }
} }

View File

@ -18,7 +18,7 @@ package org.springframework.http.server.reactive.bootstrap;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import reactor.ipc.netty.NettyContext; import reactor.netty.DisposableServer;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter; import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
@ -29,17 +29,18 @@ public class ReactorHttpsServer extends AbstractHttpServer {
private ReactorHttpHandlerAdapter reactorHandler; private ReactorHttpHandlerAdapter reactorHandler;
private reactor.ipc.netty.http.server.HttpServer reactorServer; private reactor.netty.http.server.HttpServer reactorServer;
private AtomicReference<NettyContext> nettyContext = new AtomicReference<>(); private AtomicReference<DisposableServer> disposableServer = new AtomicReference<>();
@Override @Override
protected void initServer() throws Exception { protected void initServer() throws Exception {
this.reactorHandler = createHttpHandlerAdapter(); this.reactorHandler = createHttpHandlerAdapter();
this.reactorServer = reactor.ipc.netty.http.server.HttpServer.create(builder -> { this.reactorServer = reactor.netty.http.server.HttpServer.create()
builder.host(getHost()).port(getPort()).sslSelfSigned(); .tcpConfiguration(tcpServer -> tcpServer.host(getHost())
}); .secure())
.port(getPort());
} }
private ReactorHttpHandlerAdapter createHttpHandlerAdapter() { private ReactorHttpHandlerAdapter createHttpHandlerAdapter() {
@ -48,21 +49,21 @@ public class ReactorHttpsServer extends AbstractHttpServer {
@Override @Override
protected void startInternal() { protected void startInternal() {
NettyContext nettyContext = this.reactorServer.newHandler(this.reactorHandler).block(); DisposableServer disposableServer = this.reactorServer.handle(this.reactorHandler).bind().block();
setPort(nettyContext.address().getPort()); setPort(disposableServer.address().getPort());
this.nettyContext.set(nettyContext); this.disposableServer.set(disposableServer);
} }
@Override @Override
protected void stopInternal() { protected void stopInternal() {
this.nettyContext.get().dispose(); this.disposableServer.get().dispose();
} }
@Override @Override
protected void resetInternal() { protected void resetInternal() {
this.reactorServer = null; this.reactorServer = null;
this.reactorHandler = null; this.reactorHandler = null;
this.nettyContext.set(null); this.disposableServer.set(null);
} }
} }

View File

@ -28,7 +28,7 @@ dependencies {
optional("com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${jackson2Version}") optional("com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${jackson2Version}")
optional("io.reactivex:rxjava:${rxjavaVersion}") optional("io.reactivex:rxjava:${rxjavaVersion}")
optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}") optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}")
optional("io.projectreactor.ipc:reactor-netty") optional("io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT")
optional("org.apache.tomcat:tomcat-websocket:${tomcatVersion}") { optional("org.apache.tomcat:tomcat-websocket:${tomcatVersion}") {
exclude group: "org.apache.tomcat", module: "tomcat-websocket-api" exclude group: "org.apache.tomcat", module: "tomcat-websocket-api"
exclude group: "org.apache.tomcat", module: "tomcat-servlet-api" exclude group: "org.apache.tomcat", module: "tomcat-servlet-api"

View File

@ -20,11 +20,11 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyInbound; import reactor.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound; import reactor.netty.NettyOutbound;
import reactor.ipc.netty.NettyPipeline; import reactor.netty.NettyPipeline;
import reactor.ipc.netty.http.websocket.WebsocketInbound; import reactor.netty.http.websocket.WebsocketInbound;
import reactor.ipc.netty.http.websocket.WebsocketOutbound; import reactor.netty.http.websocket.WebsocketOutbound;
import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.CloseStatus;

View File

@ -21,9 +21,7 @@ import java.util.function.Consumer;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.ipc.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClient;
import reactor.ipc.netty.http.client.HttpClientOptions;
import reactor.ipc.netty.http.client.HttpClientResponse;
import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
@ -48,15 +46,14 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen
* Default constructor. * Default constructor.
*/ */
public ReactorNettyWebSocketClient() { public ReactorNettyWebSocketClient() {
this(options -> {}); this(HttpClient.create());
} }
/** /**
* Constructor that accepts an {@link HttpClientOptions.Builder} consumer * Constructor that accepts an existing {@link HttpClient} builder.
* to supply to {@link HttpClient#create(Consumer)}.
*/ */
public ReactorNettyWebSocketClient(Consumer<? super HttpClientOptions.Builder> clientOptions) { public ReactorNettyWebSocketClient(HttpClient httpClient) {
this.httpClient = HttpClient.create(clientOptions); this.httpClient = httpClient;
} }
@ -78,29 +75,28 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen
List<String> protocols = beforeHandshake(url, headers, handler); List<String> protocols = beforeHandshake(url, headers, handler);
return getHttpClient() return getHttpClient()
.ws(url.toString(), .headers(nettyHeaders -> setNettyHeaders(headers, nettyHeaders))
nettyHeaders -> setNettyHeaders(headers, nettyHeaders), .websocket(StringUtils.collectionToCommaDelimitedString(protocols))
StringUtils.collectionToCommaDelimitedString(protocols)) .uri(url.toString())
.flatMap(response -> { .handle((in, out) -> {
HandshakeInfo info = afterHandshake(url, toHttpHeaders(response)); HandshakeInfo info = afterHandshake(url, toHttpHeaders(in.headers()));
ByteBufAllocator allocator = response.channel().alloc(); ByteBufAllocator allocator = out.alloc();
NettyDataBufferFactory factory = new NettyDataBufferFactory(allocator); NettyDataBufferFactory factory = new NettyDataBufferFactory(allocator);
return response.receiveWebsocket((in, out) -> {
WebSocketSession session = new ReactorNettyWebSocketSession(in, out, info, factory); WebSocketSession session = new ReactorNettyWebSocketSession(in, out, info, factory);
return handler.handle(session); return handler.handle(session);
}); })
}); .next();
} }
private void setNettyHeaders(HttpHeaders headers, io.netty.handler.codec.http.HttpHeaders nettyHeaders) { private void setNettyHeaders(HttpHeaders headers, io.netty.handler.codec.http.HttpHeaders nettyHeaders) {
headers.forEach(nettyHeaders::set); headers.forEach(nettyHeaders::set);
} }
private HttpHeaders toHttpHeaders(HttpClientResponse response) { private HttpHeaders toHttpHeaders(io.netty.handler.codec.http.HttpHeaders responseHeaders) {
HttpHeaders headers = new HttpHeaders(); HttpHeaders headers = new HttpHeaders();
response.responseHeaders().forEach(entry -> { responseHeaders.forEach(entry -> {
String name = entry.getKey(); String name = entry.getKey();
headers.put(name, response.responseHeaders().getAll(name)); headers.put(name, responseHeaders.getAll(name));
}); });
return headers; return headers;
} }

View File

@ -75,7 +75,7 @@ public class HandshakeWebSocketService implements WebSocketService, Lifecycle {
HandshakeWebSocketService.class.getClassLoader()); HandshakeWebSocketService.class.getClassLoader());
private static final boolean reactorNettyPresent = ClassUtils.isPresent( private static final boolean reactorNettyPresent = ClassUtils.isPresent(
"reactor.ipc.netty.http.server.HttpServerResponse", "reactor.netty.http.server.HttpServerResponse",
HandshakeWebSocketService.class.getClassLoader()); HandshakeWebSocketService.class.getClassLoader());

View File

@ -19,7 +19,7 @@ package org.springframework.web.reactive.socket.server.upgrade;
import java.util.function.Supplier; import java.util.function.Supplier;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.ipc.netty.http.server.HttpServerResponse; import reactor.netty.http.server.HttpServerResponse;
import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.server.reactive.AbstractServerHttpResponse; import org.springframework.http.server.reactive.AbstractServerHttpResponse;

View File

@ -44,5 +44,5 @@ dependencies {
optional("com.fasterxml.jackson.core:jackson-databind:${jackson2Version}") optional("com.fasterxml.jackson.core:jackson-databind:${jackson2Version}")
testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}") testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}")
testCompile("org.apache.tomcat.embed:tomcat-embed-websocket:${tomcatVersion}") testCompile("org.apache.tomcat.embed:tomcat-embed-websocket:${tomcatVersion}")
testCompile("io.projectreactor.ipc:reactor-netty") testCompile("io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT")
} }

View File

@ -366,7 +366,7 @@ and code snippets for each server:
|Server name|Group id|Artifact name |Server name|Group id|Artifact name
|Reactor Netty |Reactor Netty
|io.projectreactor.ipc |io.projectreactor.netty
|reactor-netty |reactor-netty
|Undertow |Undertow