Upgrade to reactor-netty/-ipc to 0.6 snapshots

This commit is contained in:
Stephane Maldini 2016-11-04 22:47:23 +00:00 committed by Rossen Stoyanchev
parent 57130b2d10
commit 099f5a254e
9 changed files with 82 additions and 75 deletions

View File

@ -78,7 +78,7 @@ configure(allprojects) { project ->
ext.reactivestreamsVersion = "1.0.0" ext.reactivestreamsVersion = "1.0.0"
ext.reactorVersion = "2.0.8.RELEASE" ext.reactorVersion = "2.0.8.RELEASE"
ext.reactorCoreVersion = '3.0.3.RELEASE' ext.reactorCoreVersion = '3.0.3.RELEASE'
ext.reactorNettyVersion = '0.5.2.RELEASE' ext.reactorNettyVersion = '0.6.0.BUILD-SNAPSHOT'
ext.romeVersion = "1.7.0" ext.romeVersion = "1.7.0"
ext.rxjavaVersion = '1.2.2' ext.rxjavaVersion = '1.2.2'
ext.rxjavaAdapterVersion = '1.2.1' ext.rxjavaAdapterVersion = '1.2.1'
@ -171,6 +171,7 @@ configure(allprojects) { project ->
repositories { repositories {
maven { url "https://repo.spring.io/libs-release" } maven { url "https://repo.spring.io/libs-release" }
maven { url "https://repo.spring.io/milestone" } maven { url "https://repo.spring.io/milestone" }
maven { url "https://repo.spring.io/snapshot" }
} }
dependencies { dependencies {

View File

@ -1,4 +1,4 @@
#Mon Aug 15 21:03:22 CEST 2016 #Fri Nov 04 16:30:57 GMT 2016
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME

View File

@ -17,13 +17,14 @@
package org.springframework.http.client.reactive; package org.springframework.http.client.reactive;
import java.net.URI; import java.net.URI;
import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.ipc.netty.config.ClientOptions; import reactor.ipc.netty.http.client.HttpClientOptions;
import reactor.ipc.netty.http.HttpClient; import reactor.ipc.netty.options.ClientOptions;
import reactor.ipc.netty.http.HttpException; import reactor.ipc.netty.http.client.HttpClient;
import reactor.ipc.netty.http.HttpInbound; import reactor.ipc.netty.http.client.HttpClientException;
import org.springframework.http.HttpMethod; import org.springframework.http.HttpMethod;
@ -44,13 +45,13 @@ public class ReactorClientHttpConnector implements ClientHttpConnector {
* and SSL support enabled. * and SSL support enabled.
*/ */
public ReactorClientHttpConnector() { public ReactorClientHttpConnector() {
this(ClientOptions.create().sslSupport()); this.httpClient = HttpClient.create();
} }
/** /**
* Create a Reactor Netty {@link ClientHttpConnector} with the given {@link ClientOptions} * Create a Reactor Netty {@link ClientHttpConnector} with the given {@link ClientOptions}
*/ */
public ReactorClientHttpConnector(ClientOptions clientOptions) { public ReactorClientHttpConnector(Consumer<? super HttpClientOptions> clientOptions) {
this.httpClient = HttpClient.create(clientOptions); this.httpClient = HttpClient.create(clientOptions);
} }
@ -64,8 +65,7 @@ public class ReactorClientHttpConnector implements ClientHttpConnector {
uri.toString(), uri.toString(),
httpClientRequest -> requestCallback httpClientRequest -> requestCallback
.apply(new ReactorClientHttpRequest(method, uri, httpClientRequest))) .apply(new ReactorClientHttpRequest(method, uri, httpClientRequest)))
.cast(HttpInbound.class) .otherwise(HttpClientException.class, exc -> Mono.just(exc.getResponse()))
.otherwise(HttpException.class, exc -> Mono.just(exc.getChannel()))
.map(ReactorClientHttpResponse::new); .map(ReactorClientHttpResponse::new);
} }

View File

@ -24,7 +24,7 @@ import io.netty.handler.codec.http.cookie.DefaultCookie;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.ipc.netty.http.HttpClientRequest; import reactor.ipc.netty.http.client.HttpClientRequest;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
@ -36,7 +36,7 @@ import org.springframework.http.HttpMethod;
* *
* @author Brian Clozel * @author Brian Clozel
* @since 5.0 * @since 5.0
* @see reactor.ipc.netty.http.HttpClient * @see reactor.ipc.netty.http.client.HttpClient
*/ */
public class ReactorClientHttpRequest extends AbstractClientHttpRequest { public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
@ -54,7 +54,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
this.httpMethod = httpMethod; this.httpMethod = httpMethod;
this.uri = uri; this.uri = uri;
this.httpRequest = httpRequest; this.httpRequest = httpRequest;
this.bufferFactory = new NettyDataBufferFactory(httpRequest.delegate().alloc()); this.bufferFactory = new NettyDataBufferFactory(httpRequest.channel().alloc());
} }
@ -84,7 +84,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
Publisher<Publisher<ByteBuf>> byteBufs = Flux.from(body). Publisher<Publisher<ByteBuf>> byteBufs = Flux.from(body).
map(ReactorClientHttpRequest::toByteBufs); map(ReactorClientHttpRequest::toByteBufs);
return applyBeforeCommit().then(this.httpRequest return applyBeforeCommit().then(this.httpRequest
.sendAndFlush(byteBufs)); .sendGroups(byteBufs));
} }
private static Publisher<ByteBuf> toByteBufs(Publisher<DataBuffer> dataBuffers) { private static Publisher<ByteBuf> toByteBufs(Publisher<DataBuffer> dataBuffers) {
@ -100,7 +100,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
@Override @Override
protected void writeHeaders() { protected void writeHeaders() {
getHeaders().entrySet() getHeaders().entrySet()
.forEach(e -> this.httpRequest.headers().set(e.getKey(), e.getValue())); .forEach(e -> this.httpRequest.requestHeaders().set(e.getKey(), e.getValue()));
} }
@Override @Override

View File

@ -19,7 +19,7 @@ package org.springframework.http.client.reactive;
import java.util.Collection; import java.util.Collection;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.ipc.netty.http.HttpInbound; import reactor.ipc.netty.http.client.HttpClientResponse;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.core.io.buffer.NettyDataBufferFactory;
@ -34,19 +34,19 @@ import org.springframework.util.MultiValueMap;
* {@link ClientHttpResponse} implementation for the Reactor-Netty HTTP client. * {@link ClientHttpResponse} implementation for the Reactor-Netty HTTP client.
* *
* @author Brian Clozel * @author Brian Clozel
* @see reactor.ipc.netty.http.HttpClient * @see reactor.ipc.netty.http.client.HttpClient
* @since 5.0 * @since 5.0
*/ */
public class ReactorClientHttpResponse implements ClientHttpResponse { public class ReactorClientHttpResponse implements ClientHttpResponse {
private final NettyDataBufferFactory dataBufferFactory; private final NettyDataBufferFactory dataBufferFactory;
private final HttpInbound response; private final HttpClientResponse response;
public ReactorClientHttpResponse(HttpInbound response) { public ReactorClientHttpResponse(HttpClientResponse response) {
this.response = response; this.response = response;
this.dataBufferFactory = new NettyDataBufferFactory(response.delegate().alloc()); this.dataBufferFactory = new NettyDataBufferFactory(response.channel().alloc());
} }
@ -62,7 +62,9 @@ public class ReactorClientHttpResponse implements ClientHttpResponse {
@Override @Override
public HttpHeaders getHeaders() { public HttpHeaders getHeaders() {
HttpHeaders headers = new HttpHeaders(); HttpHeaders headers = new HttpHeaders();
this.response.responseHeaders().entries().stream().forEach(e -> headers.add(e.getKey(), e.getValue())); this.response.responseHeaders()
.entries()
.forEach(e -> headers.add(e.getKey(), e.getValue()));
return headers; return headers;
} }

View File

@ -17,11 +17,12 @@
package org.springframework.http.server.reactive; package org.springframework.http.server.reactive;
import java.util.Map; import java.util.Map;
import java.util.function.Function; import java.util.function.BiFunction;
import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpResponseStatus;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.ipc.netty.http.HttpChannel; import reactor.ipc.netty.http.server.HttpServerRequest;
import reactor.ipc.netty.http.server.HttpServerResponse;
import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.core.io.buffer.NettyDataBufferFactory;
@ -32,7 +33,7 @@ import org.springframework.core.io.buffer.NettyDataBufferFactory;
* @since 5.0 * @since 5.0
*/ */
public class ReactorHttpHandlerAdapter extends HttpHandlerAdapterSupport public class ReactorHttpHandlerAdapter extends HttpHandlerAdapterSupport
implements Function<HttpChannel, Mono<Void>> { implements BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> {
public ReactorHttpHandlerAdapter(HttpHandler httpHandler) { public ReactorHttpHandlerAdapter(HttpHandler httpHandler) {
@ -45,16 +46,18 @@ public class ReactorHttpHandlerAdapter extends HttpHandlerAdapterSupport
@Override @Override
public Mono<Void> apply(HttpChannel channel) { public Mono<Void> apply(HttpServerRequest request, HttpServerResponse response) {
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(channel.delegate().alloc()); NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(request.channel()
ReactorServerHttpRequest request = new ReactorServerHttpRequest(channel, bufferFactory); .alloc());
ReactorServerHttpResponse response = new ReactorServerHttpResponse(channel, bufferFactory); ReactorServerHttpRequest req = new ReactorServerHttpRequest(request, bufferFactory);
ReactorServerHttpResponse resp = new ReactorServerHttpResponse(response,
bufferFactory);
return getHttpHandler().handle(request, response) return getHttpHandler().handle(req, resp)
.otherwise(ex -> { .otherwise(ex -> {
logger.error("Could not complete request", ex); logger.error("Could not complete request", ex);
channel.status(HttpResponseStatus.INTERNAL_SERVER_ERROR); response.status(HttpResponseStatus.INTERNAL_SERVER_ERROR);
return Mono.empty(); return Mono.empty();
}) })
.doOnSuccess(aVoid -> logger.debug("Successfully completed request")); .doOnSuccess(aVoid -> logger.debug("Successfully completed request"));

View File

@ -22,7 +22,7 @@ import java.net.URISyntaxException;
import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.codec.http.cookie.Cookie;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.ipc.netty.http.HttpChannel; import reactor.ipc.netty.http.server.HttpServerRequest;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.core.io.buffer.NettyDataBufferFactory;
@ -34,7 +34,7 @@ import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap; import org.springframework.util.MultiValueMap;
/** /**
* Adapt {@link ServerHttpRequest} to the Reactor Net {@link HttpChannel}. * Adapt {@link ServerHttpRequest} to the Reactor {@link HttpServerRequest}.
* *
* @author Stephane Maldini * @author Stephane Maldini
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
@ -42,19 +42,19 @@ import org.springframework.util.MultiValueMap;
*/ */
public class ReactorServerHttpRequest extends AbstractServerHttpRequest { public class ReactorServerHttpRequest extends AbstractServerHttpRequest {
private final HttpChannel channel; private final HttpServerRequest request;
private final NettyDataBufferFactory bufferFactory; private final NettyDataBufferFactory bufferFactory;
public ReactorServerHttpRequest(HttpChannel channel, NettyDataBufferFactory bufferFactory) { public ReactorServerHttpRequest(HttpServerRequest request, NettyDataBufferFactory bufferFactory) {
super(initUri(channel), initHeaders(channel)); super(initUri(request), initHeaders(request));
Assert.notNull(bufferFactory, "'bufferFactory' must not be null"); Assert.notNull(bufferFactory, "'bufferFactory' must not be null");
this.channel = channel; this.request = request;
this.bufferFactory = bufferFactory; this.bufferFactory = bufferFactory;
} }
private static URI initUri(HttpChannel channel) { private static URI initUri(HttpServerRequest channel) {
Assert.notNull("'channel' must not be null"); Assert.notNull("'channel' must not be null");
try { try {
URI uri = new URI(channel.uri()); URI uri = new URI(channel.uri());
@ -73,29 +73,29 @@ public class ReactorServerHttpRequest extends AbstractServerHttpRequest {
} }
} }
private static HttpHeaders initHeaders(HttpChannel channel) { private static HttpHeaders initHeaders(HttpServerRequest channel) {
HttpHeaders headers = new HttpHeaders(); HttpHeaders headers = new HttpHeaders();
for (String name : channel.headers().names()) { for (String name : channel.requestHeaders().names()) {
headers.put(name, channel.headers().getAll(name)); headers.put(name, channel.requestHeaders().getAll(name));
} }
return headers; return headers;
} }
public HttpChannel getReactorChannel() { public HttpServerRequest getReactorRequest() {
return this.channel; return this.request;
} }
@Override @Override
public HttpMethod getMethod() { public HttpMethod getMethod() {
return HttpMethod.valueOf(this.channel.method().name()); return HttpMethod.valueOf(this.request.method().name());
} }
@Override @Override
protected MultiValueMap<String, HttpCookie> initCookies() { protected MultiValueMap<String, HttpCookie> initCookies() {
MultiValueMap<String, HttpCookie> cookies = new LinkedMultiValueMap<>(); MultiValueMap<String, HttpCookie> cookies = new LinkedMultiValueMap<>();
for (CharSequence name : this.channel.cookies().keySet()) { for (CharSequence name : this.request.cookies().keySet()) {
for (Cookie cookie : this.channel.cookies().get(name)) { for (Cookie cookie : this.request.cookies().get(name)) {
HttpCookie httpCookie = new HttpCookie(name.toString(), cookie.value()); HttpCookie httpCookie = new HttpCookie(name.toString(), cookie.value());
cookies.add(name.toString(), httpCookie); cookies.add(name.toString(), httpCookie);
} }
@ -105,7 +105,7 @@ public class ReactorServerHttpRequest extends AbstractServerHttpRequest {
@Override @Override
public Flux<DataBuffer> getBody() { public Flux<DataBuffer> getBody() {
return this.channel.receive().retain().map(this.bufferFactory::wrap); return this.request.receive().retain().map(this.bufferFactory::wrap);
} }
} }

View File

@ -25,7 +25,7 @@ import io.netty.handler.codec.http.cookie.DefaultCookie;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.ipc.netty.http.HttpChannel; import reactor.ipc.netty.http.server.HttpServerResponse;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
@ -37,7 +37,7 @@ import org.springframework.http.ZeroCopyHttpOutputMessage;
import org.springframework.util.Assert; import org.springframework.util.Assert;
/** /**
* Adapt {@link ServerHttpResponse} to the Reactor Net {@link HttpChannel}. * Adapt {@link ServerHttpResponse} to the {@link HttpServerResponse}.
* *
* @author Stephane Maldini * @author Stephane Maldini
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
@ -46,18 +46,18 @@ import org.springframework.util.Assert;
public class ReactorServerHttpResponse extends AbstractServerHttpResponse public class ReactorServerHttpResponse extends AbstractServerHttpResponse
implements ZeroCopyHttpOutputMessage { implements ZeroCopyHttpOutputMessage {
private final HttpChannel channel; private final HttpServerResponse response;
public ReactorServerHttpResponse(HttpChannel response, DataBufferFactory bufferFactory) { public ReactorServerHttpResponse(HttpServerResponse response, DataBufferFactory bufferFactory) {
super(bufferFactory); super(bufferFactory);
Assert.notNull("'response' must not be null."); Assert.notNull("'response' must not be null.");
this.channel = response; this.response = response;
} }
public HttpChannel getReactorChannel() { public HttpServerResponse getReactorResponse() {
return this.channel; return this.response;
} }
@ -65,32 +65,32 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse
protected void applyStatusCode() { protected void applyStatusCode() {
HttpStatus statusCode = this.getStatusCode(); HttpStatus statusCode = this.getStatusCode();
if (statusCode != null) { if (statusCode != null) {
getReactorChannel().status(HttpResponseStatus.valueOf(statusCode.value())); getReactorResponse().status(HttpResponseStatus.valueOf(statusCode.value()));
} }
} }
@Override @Override
protected Mono<Void> writeWithInternal(Publisher<DataBuffer> publisher) { protected Mono<Void> writeWithInternal(Publisher<DataBuffer> publisher) {
Publisher<ByteBuf> body = toByteBufs(publisher); Publisher<ByteBuf> body = toByteBufs(publisher);
return this.channel.send(body); return this.response.send(body);
} }
@Override @Override
protected Mono<Void> writeAndFlushWithInternal(Publisher<Publisher<DataBuffer>> publisher) { protected Mono<Void> writeAndFlushWithInternal(Publisher<Publisher<DataBuffer>> publisher) {
Publisher<Publisher<ByteBuf>> body = Flux.from(publisher) Publisher<Publisher<ByteBuf>> body = Flux.from(publisher)
.map(ReactorServerHttpResponse::toByteBufs); .map(ReactorServerHttpResponse::toByteBufs);
return this.channel.sendAndFlush(body); return this.response.sendGroups(body);
} }
@Override @Override
protected void applyHeaders() { protected void applyHeaders() {
// TODO: temporarily, see https://github.com/reactor/reactor-netty/issues/2 // TODO: temporarily, see https://github.com/reactor/reactor-netty/issues/2
if(getHeaders().containsKey(HttpHeaders.CONTENT_LENGTH)){ if(getHeaders().containsKey(HttpHeaders.CONTENT_LENGTH)){
this.channel.responseTransfer(false); this.response.disableChunkedTransfer();
} }
for (String name : getHeaders().keySet()) { for (String name : getHeaders().keySet()) {
for (String value : getHeaders().get(name)) { for (String value : getHeaders().get(name)) {
this.channel.responseHeaders().add(name, value); this.response.responseHeaders().add(name, value);
} }
} }
} }
@ -107,14 +107,14 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse
httpCookie.getPath().ifPresent(cookie::setPath); httpCookie.getPath().ifPresent(cookie::setPath);
cookie.setSecure(httpCookie.isSecure()); cookie.setSecure(httpCookie.isSecure());
cookie.setHttpOnly(httpCookie.isHttpOnly()); cookie.setHttpOnly(httpCookie.isHttpOnly());
this.channel.addResponseCookie(cookie); this.response.addCookie(cookie);
} }
} }
} }
@Override @Override
public Mono<Void> writeWith(File file, long position, long count) { public Mono<Void> writeWith(File file, long position, long count) {
return doCommit(() -> this.channel.sendFile(file, position, count)); return doCommit(() -> this.response.sendFile(file, position, count));
} }
private static Publisher<ByteBuf> toByteBufs(Publisher<DataBuffer> dataBuffers) { private static Publisher<ByteBuf> toByteBufs(Publisher<DataBuffer> dataBuffers) {

View File

@ -17,6 +17,7 @@
package org.springframework.http.server.reactive.bootstrap; package org.springframework.http.server.reactive.bootstrap;
import reactor.core.Loopback; import reactor.core.Loopback;
import reactor.ipc.netty.NettyContext;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter; import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -28,9 +29,9 @@ public class ReactorHttpServer extends HttpServerSupport implements HttpServer,
private ReactorHttpHandlerAdapter reactorHandler; private ReactorHttpHandlerAdapter reactorHandler;
private reactor.ipc.netty.http.HttpServer reactorServer; private reactor.ipc.netty.http.server.HttpServer reactorServer;
private boolean running; private NettyContext running;
@Override @Override
@ -42,13 +43,16 @@ public class ReactorHttpServer extends HttpServerSupport implements HttpServer,
Assert.notNull(getHttpHandler()); Assert.notNull(getHttpHandler());
this.reactorHandler = new ReactorHttpHandlerAdapter(getHttpHandler()); this.reactorHandler = new ReactorHttpHandlerAdapter(getHttpHandler());
} }
this.reactorServer = reactor.ipc.netty.http.HttpServer.create(getHost(), getPort()); this.reactorServer = reactor.ipc.netty.http.server.HttpServer.create(getHost(),
getPort());
} }
@Override @Override
public boolean isRunning() { public boolean isRunning() {
return this.running; NettyContext running = this.running;
return running != null && running.channel()
.isActive();
} }
@Override @Override
@ -63,22 +67,19 @@ public class ReactorHttpServer extends HttpServerSupport implements HttpServer,
@Override @Override
public void start() { public void start() {
if (!this.running) { //Should be made thread-safe (compareAndSet..)
try { if (this.running == null) {
this.reactorServer.startAndAwait(reactorHandler); this.running = this.reactorServer.newHandler(reactorHandler)
this.running = true; .block();
}
catch (InterruptedException ex) {
throw new IllegalStateException(ex);
}
} }
} }
@Override @Override
public void stop() { public void stop() {
if (this.running) { NettyContext running = this.running;
this.reactorServer.shutdown(); if (running != null) {
this.running = false; this.running = null;
running.dispose();
} }
} }
} }