diff --git a/spring-messaging/spring-messaging.gradle b/spring-messaging/spring-messaging.gradle index cc73a22cbee..f1da8e7c5f6 100644 --- a/spring-messaging/spring-messaging.gradle +++ b/spring-messaging/spring-messaging.gradle @@ -18,7 +18,7 @@ dependencies { compile(project(":spring-core")) optional(project(":spring-context")) optional(project(":spring-oxm")) - optional("io.projectreactor.ipc:reactor-netty") + optional("io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT") optional("org.eclipse.jetty.websocket:websocket-server:${jettyVersion}") { exclude group: "javax.servlet", module: "javax.servlet-api" } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 83cce81f0d8..0277ebaa43c 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -39,14 +39,13 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; -import reactor.ipc.netty.FutureMono; -import reactor.ipc.netty.NettyContext; -import reactor.ipc.netty.NettyInbound; -import reactor.ipc.netty.NettyOutbound; -import reactor.ipc.netty.options.ClientOptions; -import reactor.ipc.netty.resources.LoopResources; -import reactor.ipc.netty.resources.PoolResources; -import reactor.ipc.netty.tcp.TcpClient; +import reactor.netty.Connection; +import reactor.netty.FutureMono; +import reactor.netty.NettyInbound; +import reactor.netty.NettyOutbound; +import reactor.netty.resources.ConnectionProvider; +import reactor.netty.resources.LoopResources; +import reactor.netty.tcp.TcpClient; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; @@ -83,7 +82,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ private LoopResources loopResources; @Nullable - private PoolResources poolResources; + private ConnectionProvider poolResources; private final Scheduler scheduler = Schedulers.newParallel("tcp-client-scheduler"); @@ -98,57 +97,18 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ * @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec */ public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec

codec) { - this(builder -> builder.host(host).port(port), codec); - } - - /** - * Constructor with a {@link ClientOptions.Builder} that can be used to - * customize Reactor Netty client options. - * - *

Note: this constructor manages the lifecycle of the - * {@link TcpClient} and its underlying resources. Please do not customize - * any of the following options: - * {@link ClientOptions.Builder#channelGroup(ChannelGroup) ChannelGroup}, - * {@link ClientOptions.Builder#loopResources(LoopResources) LoopResources}, and - * {@link ClientOptions.Builder#poolResources(PoolResources) PoolResources}. - * You may set the {@link ClientOptions.Builder#disablePool() disablePool} - * option if you simply want to turn off pooling. - * - *

For full control over the initialization and lifecycle of the TcpClient, - * see {@link #ReactorNettyTcpClient(TcpClient, ReactorNettyCodec)}. - * - * @param optionsConsumer consumer to customize client options - * @param codec the code to use - * @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec - */ - public ReactorNettyTcpClient(Consumer> optionsConsumer, - ReactorNettyCodec

codec) { - - Assert.notNull(optionsConsumer, "Consumer is required"); + Assert.notNull(host, "host is required"); + Assert.notNull(port, "port is required"); Assert.notNull(codec, "ReactorNettyCodec is required"); this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); - - Consumer> builtInConsumer = builder -> { - - Assert.isTrue(!builder.isLoopAvailable() && !builder.isPoolAvailable(), - "The provided ClientOptions.Builder contains LoopResources and/or PoolResources. " + - "Please, use the constructor that accepts a TcpClient instance " + - "for full control over initialization and lifecycle."); - - builder.channelGroup(this.channelGroup); - builder.preferNative(false); - - this.loopResources = LoopResources.create("tcp-client-loop"); - builder.loopResources(this.loopResources); - - if (!builder.isPoolDisabled()) { - this.poolResources = PoolResources.elastic("tcp-client-pool"); - builder.poolResources(this.poolResources); - } - }; - - this.tcpClient = TcpClient.create(optionsConsumer.andThen(builtInConsumer)); + this.loopResources = LoopResources.create("tcp-client-loop"); + this.poolResources = ConnectionProvider.elastic("tcp-client-pool"); + this.tcpClient = TcpClient.create(poolResources) + .host(host) + .port(port) + .runOn(loopResources, false) + .doOnConnected(c -> channelGroup.add(c.channel())); this.codec = codec; } @@ -181,7 +141,8 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ } Mono connectMono = this.tcpClient - .newHandler(new ReactorNettyHandler(handler)) + .handle(new ReactorNettyHandler(handler)) + .connect() .doOnError(handler::afterConnectFailure) .then(); @@ -201,11 +162,12 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ MonoProcessor connectMono = MonoProcessor.create(); this.tcpClient - .newHandler(new ReactorNettyHandler(handler)) + .handle(new ReactorNettyHandler(handler)) + .connect() .doOnNext(updateConnectMono(connectMono)) .doOnError(updateConnectMono(connectMono)) .doOnError(handler::afterConnectFailure) // report all connect failures to the handler - .flatMap(NettyContext::onClose) // post-connect issues + .flatMap(Connection::onDispose) // post-connect issues .retryWhen(reconnectFunction(strategy)) .repeatWhen(reconnectFunction(strategy)) .subscribe(); @@ -302,14 +264,16 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ @Override @SuppressWarnings("unchecked") public Publisher apply(NettyInbound inbound, NettyOutbound outbound) { - if (logger.isDebugEnabled()) { - logger.debug("Connected to " + inbound.remoteAddress()); - } + inbound.withConnection(c -> { + if (logger.isDebugEnabled()) { + logger.debug("Connected to " + c.address()); + } + }); DirectProcessor completion = DirectProcessor.create(); TcpConnection

connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion); scheduler.schedule(() -> connectionHandler.afterConnected(connection)); - inbound.context().addHandler(new StompMessageDecoder<>(codec)); + inbound.withConnection(c -> c.addHandler(new StompMessageDecoder<>(codec))); inbound.receiveObject() .cast(Message.class) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java index c4107f7e9ed..e130bcefce7 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java @@ -17,12 +17,10 @@ package org.springframework.messaging.tcp.reactor; import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelPipeline; import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Mono; -import reactor.ipc.netty.NettyInbound; -import reactor.ipc.netty.NettyOutbound; -import reactor.ipc.netty.NettyPipeline; +import reactor.netty.NettyInbound; +import reactor.netty.NettyOutbound; import org.springframework.messaging.Message; import org.springframework.messaging.tcp.TcpConnection; @@ -66,20 +64,13 @@ public class ReactorNettyTcpConnection

implements TcpConnection

{ @Override @SuppressWarnings("deprecation") public void onReadInactivity(Runnable runnable, long inactivityDuration) { - // TODO: workaround for https://github.com/reactor/reactor-netty/issues/22 - ChannelPipeline pipeline = this.inbound.context().channel().pipeline(); - String name = NettyPipeline.OnChannelReadIdle; - if (pipeline.context(name) != null) { - pipeline.remove(name); - } - - this.inbound.onReadIdle(inactivityDuration, runnable); + this.inbound.withConnection(c -> c.onReadIdle(inactivityDuration, runnable)); } @Override @SuppressWarnings("deprecation") public void onWriteInactivity(Runnable runnable, long inactivityDuration) { - this.outbound.onWriteIdle(inactivityDuration, runnable); + this.inbound.withConnection(c -> c.onWriteIdle(inactivityDuration, runnable)); } @Override diff --git a/spring-test/spring-test.gradle b/spring-test/spring-test.gradle index eb7df0d3e0a..516e6c04fa7 100644 --- a/spring-test/spring-test.gradle +++ b/spring-test/spring-test.gradle @@ -80,7 +80,7 @@ dependencies { testCompile("org.apache.httpcomponents:httpclient:4.5.5") { exclude group: "commons-logging", module: "commons-logging" } - testCompile('io.projectreactor.ipc:reactor-netty') + testCompile('io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT') testCompile('de.bechte.junit:junit-hierarchicalcontextrunner:4.12.1') // Pull in the latest JUnit 5 Launcher API and the Vintage engine as well // so that we can run JUnit 4 tests in IntelliJ IDEA. diff --git a/spring-web/spring-web.gradle b/spring-web/spring-web.gradle index c716a81e26a..f2162a2984d 100644 --- a/spring-web/spring-web.gradle +++ b/spring-web/spring-web.gradle @@ -34,7 +34,7 @@ dependencies { optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}") optional("io.reactivex.rxjava2:rxjava:${rxjava2Version}") optional("io.netty:netty-all") - optional("io.projectreactor.ipc:reactor-netty") + optional("io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT") optional("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}") optional("org.eclipse.jetty:jetty-server:${jettyVersion}") { exclude group: "javax.servlet", module: "javax.servlet-api" diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java index 2ee616b1fac..ab21605ee9f 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java @@ -17,24 +17,25 @@ package org.springframework.http.client.reactive; import java.net.URI; -import java.util.function.Consumer; import java.util.function.Function; import reactor.core.publisher.Mono; -import reactor.ipc.netty.http.client.HttpClient; -import reactor.ipc.netty.http.client.HttpClientOptions; -import reactor.ipc.netty.http.client.HttpClientRequest; -import reactor.ipc.netty.http.client.HttpClientResponse; -import reactor.ipc.netty.options.ClientOptions; +import reactor.netty.NettyInbound; +import reactor.netty.NettyOutbound; +import reactor.netty.http.client.HttpClient; +import reactor.netty.http.client.HttpClientRequest; +import reactor.netty.http.client.HttpClientResponse; import org.springframework.http.HttpMethod; +import io.netty.buffer.ByteBufAllocator; + /** * Reactor-Netty implementation of {@link ClientHttpConnector}. * * @author Brian Clozel * @since 5.0 - * @see reactor.ipc.netty.http.client.HttpClient + * @see reactor.netty.http.client.HttpClient */ public class ReactorClientHttpConnector implements ClientHttpConnector { @@ -43,20 +44,19 @@ public class ReactorClientHttpConnector implements ClientHttpConnector { /** * Create a Reactor Netty {@link ClientHttpConnector} - * with default {@link ClientOptions} and HTTP compression support enabled. + * with a default configuration and HTTP compression support enabled. */ public ReactorClientHttpConnector() { - this.httpClient = HttpClient.builder() - .options(options -> options.compression(true)) - .build(); + this.httpClient = HttpClient.create() + .compress(); } /** * Create a Reactor Netty {@link ClientHttpConnector} with the given - * {@link HttpClientOptions.Builder} + * {@link HttpClient} */ - public ReactorClientHttpConnector(Consumer clientOptions) { - this.httpClient = HttpClient.create(clientOptions); + public ReactorClientHttpConnector(HttpClient httpClient) { + this.httpClient = httpClient; } @@ -69,22 +69,24 @@ public class ReactorClientHttpConnector implements ClientHttpConnector { } return this.httpClient - .request(adaptHttpMethod(method), - uri.toString(), - request -> requestCallback.apply(adaptRequest(method, uri, request))) - .map(this::adaptResponse); + .request(adaptHttpMethod(method)) + .uri(uri.toString()) + .send((req, out) -> requestCallback.apply(adaptRequest(method, uri, req, out))) + .responseConnection((res, con) -> Mono.just(adaptResponse(res, con.inbound(), con.outbound().alloc()))) + .next(); } private io.netty.handler.codec.http.HttpMethod adaptHttpMethod(HttpMethod method) { return io.netty.handler.codec.http.HttpMethod.valueOf(method.name()); } - private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request) { - return new ReactorClientHttpRequest(method, uri, request); + private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request, NettyOutbound out) { + return new ReactorClientHttpRequest(method, uri, request, out); } - private ClientHttpResponse adaptResponse(HttpClientResponse response) { - return new ReactorClientHttpResponse(response); + private ClientHttpResponse adaptResponse(HttpClientResponse response, NettyInbound nettyInbound, + ByteBufAllocator alloc) { + return new ReactorClientHttpResponse(response, nettyInbound, alloc); } } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java index 3302bc79abc..349447a6225 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java @@ -25,7 +25,8 @@ import io.netty.handler.codec.http.cookie.DefaultCookie; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.ipc.netty.http.client.HttpClientRequest; +import reactor.netty.NettyOutbound; +import reactor.netty.http.client.HttpClientRequest; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; @@ -38,7 +39,7 @@ import org.springframework.http.ZeroCopyHttpOutputMessage; * * @author Brian Clozel * @since 5.0 - * @see reactor.ipc.netty.http.client.HttpClient + * @see reactor.netty.http.client.HttpClient */ class ReactorClientHttpRequest extends AbstractClientHttpRequest implements ZeroCopyHttpOutputMessage { @@ -48,15 +49,18 @@ class ReactorClientHttpRequest extends AbstractClientHttpRequest implements Zero private final HttpClientRequest httpRequest; + private final NettyOutbound out; + private final NettyDataBufferFactory bufferFactory; public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri, - HttpClientRequest httpRequest) { + HttpClientRequest httpRequest, NettyOutbound out) { this.httpMethod = httpMethod; this.uri = uri; - this.httpRequest = httpRequest.failOnClientError(false).failOnServerError(false); - this.bufferFactory = new NettyDataBufferFactory(httpRequest.alloc()); + this.httpRequest = httpRequest; + this.out = out; + this.bufferFactory = new NettyDataBufferFactory(out.alloc()); } @@ -77,14 +81,14 @@ class ReactorClientHttpRequest extends AbstractClientHttpRequest implements Zero @Override public Mono writeWith(Publisher body) { - return doCommit(() -> this.httpRequest + return doCommit(() -> this.out .send(Flux.from(body).map(NettyDataBufferFactory::toByteBuf)).then()); } @Override public Mono writeAndFlushWith(Publisher> body) { Publisher> byteBufs = Flux.from(body).map(ReactorClientHttpRequest::toByteBufs); - return doCommit(() -> this.httpRequest.sendGroups(byteBufs).then()); + return doCommit(() -> this.out.sendGroups(byteBufs).then()); } private static Publisher toByteBufs(Publisher dataBuffers) { @@ -93,12 +97,12 @@ class ReactorClientHttpRequest extends AbstractClientHttpRequest implements Zero @Override public Mono writeWith(File file, long position, long count) { - return doCommit(() -> this.httpRequest.sendFile(file.toPath(), position, count).then()); + return doCommit(() -> this.out.sendFile(file.toPath(), position, count).then()); } @Override public Mono setComplete() { - return doCommit(() -> httpRequest.sendHeaders().then()); + return doCommit(() -> out.then()); } @Override diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java index adbf6b1b5f2..9e7b4df5b36 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java @@ -19,7 +19,8 @@ package org.springframework.http.client.reactive; import java.util.Collection; import reactor.core.publisher.Flux; -import reactor.ipc.netty.http.client.HttpClientResponse; +import reactor.netty.NettyInbound; +import reactor.netty.http.client.HttpClientResponse; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.NettyDataBufferFactory; @@ -30,12 +31,14 @@ import org.springframework.util.CollectionUtils; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; +import io.netty.buffer.ByteBufAllocator; + /** * {@link ClientHttpResponse} implementation for the Reactor-Netty HTTP client. * * @author Brian Clozel * @since 5.0 - * @see reactor.ipc.netty.http.client.HttpClient + * @see reactor.netty.http.client.HttpClient */ class ReactorClientHttpResponse implements ClientHttpResponse { @@ -43,16 +46,21 @@ class ReactorClientHttpResponse implements ClientHttpResponse { private final HttpClientResponse response; + private final NettyInbound nettyInbound; - public ReactorClientHttpResponse(HttpClientResponse response) { + + public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound nettyInbound, + ByteBufAllocator alloc) { this.response = response; - this.dataBufferFactory = new NettyDataBufferFactory(response.channel().alloc()); + this.nettyInbound = nettyInbound; + this.dataBufferFactory = new NettyDataBufferFactory(alloc); } @Override public Flux getBody() { - return response.receive() + return nettyInbound + .receive() .map(buf -> { buf.retain(); return dataBufferFactory.wrap(buf); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java index a53deaf2ff2..c061a0e2c91 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java @@ -23,8 +23,8 @@ import io.netty.handler.codec.http.HttpResponseStatus; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Mono; -import reactor.ipc.netty.http.server.HttpServerRequest; -import reactor.ipc.netty.http.server.HttpServerResponse; +import reactor.netty.http.server.HttpServerRequest; +import reactor.netty.http.server.HttpServerResponse; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.HttpMethod; diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java index be5c6572fcd..24df6f9b2f0 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java @@ -21,12 +21,12 @@ import java.net.URI; import java.net.URISyntaxException; import javax.net.ssl.SSLSession; -import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.ssl.SslHandler; import reactor.core.publisher.Flux; -import reactor.ipc.netty.http.server.HttpServerRequest; +import reactor.netty.Connection; +import reactor.netty.http.server.HttpServerRequest; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.NettyDataBufferFactory; @@ -90,16 +90,14 @@ class ReactorServerHttpRequest extends AbstractServerHttpRequest { } } else { - InetSocketAddress localAddress = (InetSocketAddress) request.context().channel().localAddress(); + InetSocketAddress localAddress = request.hostAddress(); return new URI(scheme, null, localAddress.getHostString(), localAddress.getPort(), null, null, null); } } private static String getScheme(HttpServerRequest request) { - ChannelPipeline pipeline = request.context().channel().pipeline(); - boolean ssl = pipeline.get(SslHandler.class) != null; - return ssl ? "https" : "http"; + return request.scheme(); } private static String resolveRequestUri(HttpServerRequest request) { @@ -157,7 +155,7 @@ class ReactorServerHttpRequest extends AbstractServerHttpRequest { @Nullable protected SslInfo initSslInfo() { - SslHandler sslHandler = this.request.context().channel().pipeline().get(SslHandler.class); + SslHandler sslHandler = ((Connection) this.request).channel().pipeline().get(SslHandler.class); if (sslHandler != null) { SSLSession session = sslHandler.engine().getSession(); return new DefaultSslInfo(session); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java index 7eb2465a180..c46908cd4dd 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java @@ -25,7 +25,7 @@ import io.netty.handler.codec.http.cookie.DefaultCookie; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.ipc.netty.http.server.HttpServerResponse; +import reactor.netty.http.server.HttpServerResponse; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpServer.java b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpServer.java index 4ef630fe62e..dba2f4c56a8 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpServer.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpServer.java @@ -18,7 +18,7 @@ package org.springframework.http.server.reactive.bootstrap; import java.util.concurrent.atomic.AtomicReference; -import reactor.ipc.netty.NettyContext; +import reactor.netty.DisposableServer; import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter; @@ -29,15 +29,17 @@ public class ReactorHttpServer extends AbstractHttpServer { private ReactorHttpHandlerAdapter reactorHandler; - private reactor.ipc.netty.http.server.HttpServer reactorServer; + private reactor.netty.http.server.HttpServer reactorServer; - private AtomicReference nettyContext = new AtomicReference<>(); + private AtomicReference disposableServer = new AtomicReference<>(); @Override protected void initServer() throws Exception { this.reactorHandler = createHttpHandlerAdapter(); - this.reactorServer = reactor.ipc.netty.http.server.HttpServer.create(getHost(), getPort()); + this.reactorServer = reactor.netty.http.server.HttpServer.create() + .tcpConfiguration(tcpServer -> tcpServer.host(getHost())) + .port(getPort()); } private ReactorHttpHandlerAdapter createHttpHandlerAdapter() { @@ -46,21 +48,21 @@ public class ReactorHttpServer extends AbstractHttpServer { @Override protected void startInternal() { - NettyContext nettyContext = this.reactorServer.newHandler(this.reactorHandler).block(); - setPort(nettyContext.address().getPort()); - this.nettyContext.set(nettyContext); + DisposableServer disposableServer = this.reactorServer.handle(this.reactorHandler).bind().block(); + setPort(disposableServer.address().getPort()); + this.disposableServer.set(disposableServer); } @Override protected void stopInternal() { - this.nettyContext.get().dispose(); + this.disposableServer.get().dispose(); } @Override protected void resetInternal() { this.reactorServer = null; this.reactorHandler = null; - this.nettyContext.set(null); + this.disposableServer.set(null); } } diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpsServer.java b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpsServer.java index b7ed8bd7116..8717440ccb8 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpsServer.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpsServer.java @@ -18,7 +18,7 @@ package org.springframework.http.server.reactive.bootstrap; import java.util.concurrent.atomic.AtomicReference; -import reactor.ipc.netty.NettyContext; +import reactor.netty.DisposableServer; import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter; @@ -29,17 +29,18 @@ public class ReactorHttpsServer extends AbstractHttpServer { private ReactorHttpHandlerAdapter reactorHandler; - private reactor.ipc.netty.http.server.HttpServer reactorServer; + private reactor.netty.http.server.HttpServer reactorServer; - private AtomicReference nettyContext = new AtomicReference<>(); + private AtomicReference disposableServer = new AtomicReference<>(); @Override protected void initServer() throws Exception { this.reactorHandler = createHttpHandlerAdapter(); - this.reactorServer = reactor.ipc.netty.http.server.HttpServer.create(builder -> { - builder.host(getHost()).port(getPort()).sslSelfSigned(); - }); + this.reactorServer = reactor.netty.http.server.HttpServer.create() + .tcpConfiguration(tcpServer -> tcpServer.host(getHost()) + .secure()) + .port(getPort()); } private ReactorHttpHandlerAdapter createHttpHandlerAdapter() { @@ -48,21 +49,21 @@ public class ReactorHttpsServer extends AbstractHttpServer { @Override protected void startInternal() { - NettyContext nettyContext = this.reactorServer.newHandler(this.reactorHandler).block(); - setPort(nettyContext.address().getPort()); - this.nettyContext.set(nettyContext); + DisposableServer disposableServer = this.reactorServer.handle(this.reactorHandler).bind().block(); + setPort(disposableServer.address().getPort()); + this.disposableServer.set(disposableServer); } @Override protected void stopInternal() { - this.nettyContext.get().dispose(); + this.disposableServer.get().dispose(); } @Override protected void resetInternal() { this.reactorServer = null; this.reactorHandler = null; - this.nettyContext.set(null); + this.disposableServer.set(null); } } diff --git a/spring-webflux/spring-webflux.gradle b/spring-webflux/spring-webflux.gradle index 29133a72f81..c3697b994a2 100644 --- a/spring-webflux/spring-webflux.gradle +++ b/spring-webflux/spring-webflux.gradle @@ -28,7 +28,7 @@ dependencies { optional("com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${jackson2Version}") optional("io.reactivex:rxjava:${rxjavaVersion}") optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}") - optional("io.projectreactor.ipc:reactor-netty") + optional("io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT") optional("org.apache.tomcat:tomcat-websocket:${tomcatVersion}") { exclude group: "org.apache.tomcat", module: "tomcat-websocket-api" exclude group: "org.apache.tomcat", module: "tomcat-servlet-api" diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java index dcf00451c74..14b693c4de9 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java @@ -20,11 +20,11 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrame; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.ipc.netty.NettyInbound; -import reactor.ipc.netty.NettyOutbound; -import reactor.ipc.netty.NettyPipeline; -import reactor.ipc.netty.http.websocket.WebsocketInbound; -import reactor.ipc.netty.http.websocket.WebsocketOutbound; +import reactor.netty.NettyInbound; +import reactor.netty.NettyOutbound; +import reactor.netty.NettyPipeline; +import reactor.netty.http.websocket.WebsocketInbound; +import reactor.netty.http.websocket.WebsocketOutbound; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.web.reactive.socket.CloseStatus; diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java index 42fa2ec8d1b..0699e2cebe0 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java @@ -21,9 +21,7 @@ import java.util.function.Consumer; import io.netty.buffer.ByteBufAllocator; import reactor.core.publisher.Mono; -import reactor.ipc.netty.http.client.HttpClient; -import reactor.ipc.netty.http.client.HttpClientOptions; -import reactor.ipc.netty.http.client.HttpClientResponse; +import reactor.netty.http.client.HttpClient; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.HttpHeaders; @@ -48,15 +46,14 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen * Default constructor. */ public ReactorNettyWebSocketClient() { - this(options -> {}); + this(HttpClient.create()); } /** - * Constructor that accepts an {@link HttpClientOptions.Builder} consumer - * to supply to {@link HttpClient#create(Consumer)}. + * Constructor that accepts an existing {@link HttpClient} builder. */ - public ReactorNettyWebSocketClient(Consumer clientOptions) { - this.httpClient = HttpClient.create(clientOptions); + public ReactorNettyWebSocketClient(HttpClient httpClient) { + this.httpClient = httpClient; } @@ -78,29 +75,28 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen List protocols = beforeHandshake(url, headers, handler); return getHttpClient() - .ws(url.toString(), - nettyHeaders -> setNettyHeaders(headers, nettyHeaders), - StringUtils.collectionToCommaDelimitedString(protocols)) - .flatMap(response -> { - HandshakeInfo info = afterHandshake(url, toHttpHeaders(response)); - ByteBufAllocator allocator = response.channel().alloc(); + .headers(nettyHeaders -> setNettyHeaders(headers, nettyHeaders)) + .websocket(StringUtils.collectionToCommaDelimitedString(protocols)) + .uri(url.toString()) + .handle((in, out) -> { + HandshakeInfo info = afterHandshake(url, toHttpHeaders(in.headers())); + ByteBufAllocator allocator = out.alloc(); NettyDataBufferFactory factory = new NettyDataBufferFactory(allocator); - return response.receiveWebsocket((in, out) -> { - WebSocketSession session = new ReactorNettyWebSocketSession(in, out, info, factory); - return handler.handle(session); - }); - }); + WebSocketSession session = new ReactorNettyWebSocketSession(in, out, info, factory); + return handler.handle(session); + }) + .next(); } private void setNettyHeaders(HttpHeaders headers, io.netty.handler.codec.http.HttpHeaders nettyHeaders) { headers.forEach(nettyHeaders::set); } - private HttpHeaders toHttpHeaders(HttpClientResponse response) { + private HttpHeaders toHttpHeaders(io.netty.handler.codec.http.HttpHeaders responseHeaders) { HttpHeaders headers = new HttpHeaders(); - response.responseHeaders().forEach(entry -> { + responseHeaders.forEach(entry -> { String name = entry.getKey(); - headers.put(name, response.responseHeaders().getAll(name)); + headers.put(name, responseHeaders.getAll(name)); }); return headers; } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java index 8184d1d546d..91910ade4d4 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java @@ -75,7 +75,7 @@ public class HandshakeWebSocketService implements WebSocketService, Lifecycle { HandshakeWebSocketService.class.getClassLoader()); private static final boolean reactorNettyPresent = ClassUtils.isPresent( - "reactor.ipc.netty.http.server.HttpServerResponse", + "reactor.netty.http.server.HttpServerResponse", HandshakeWebSocketService.class.getClassLoader()); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java index 8d8af05ad75..94da3974554 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java @@ -19,7 +19,7 @@ package org.springframework.web.reactive.socket.server.upgrade; import java.util.function.Supplier; import reactor.core.publisher.Mono; -import reactor.ipc.netty.http.server.HttpServerResponse; +import reactor.netty.http.server.HttpServerResponse; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.server.reactive.AbstractServerHttpResponse; diff --git a/spring-websocket/spring-websocket.gradle b/spring-websocket/spring-websocket.gradle index a0fe52d7317..bba796f1887 100644 --- a/spring-websocket/spring-websocket.gradle +++ b/spring-websocket/spring-websocket.gradle @@ -44,5 +44,5 @@ dependencies { optional("com.fasterxml.jackson.core:jackson-databind:${jackson2Version}") testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}") testCompile("org.apache.tomcat.embed:tomcat-embed-websocket:${tomcatVersion}") - testCompile("io.projectreactor.ipc:reactor-netty") + testCompile("io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT") } diff --git a/src/docs/asciidoc/web/webflux.adoc b/src/docs/asciidoc/web/webflux.adoc index 5d9c8fbf932..821effc87a4 100644 --- a/src/docs/asciidoc/web/webflux.adoc +++ b/src/docs/asciidoc/web/webflux.adoc @@ -366,7 +366,7 @@ and code snippets for each server: |Server name|Group id|Artifact name |Reactor Netty -|io.projectreactor.ipc +|io.projectreactor.netty |reactor-netty |Undertow