From 0e5a892bad25a3ebcf018763c46fc6c8cfa15de8 Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Mon, 18 Apr 2016 17:49:32 +0100 Subject: [PATCH] Use ByteBuf instead of Buffer in reactor-netty --- .../reactive/ReactorClientHttpRequest.java | 25 +++++++++++----- .../reactive/ReactorClientHttpResponse.java | 3 +- .../ReactorHttpClientRequestFactory.java | 14 +++------ .../reactive/ReactorHttpHandlerAdapter.java | 15 +++++----- .../reactive/ReactorServerHttpRequest.java | 10 +++---- .../reactive/ReactorServerHttpResponse.java | 15 ++++++++-- .../reactive/boot/ReactorHttpServer.java | 11 +------ .../reactive/WebClientIntegrationTests.java | 29 +++++++++---------- 8 files changed, 61 insertions(+), 61 deletions(-) diff --git a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java index 16a192a5bc..a8331a50aa 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java @@ -19,15 +19,19 @@ package org.springframework.http.client.reactive; import java.net.URI; import java.util.Collection; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.cookie.DefaultCookie; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.io.buffer.Buffer; import reactor.io.netty.http.HttpClient; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferAllocator; +import org.springframework.core.io.buffer.DefaultDataBufferAllocator; +import org.springframework.core.io.buffer.NettyDataBuffer; +import org.springframework.core.io.buffer.NettyDataBufferAllocator; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -47,13 +51,13 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { private final HttpClient httpClient; - private Flux body; + private Flux body; - public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri, HttpClient httpClient, HttpHeaders headers, - DataBufferAllocator allocator) { + public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri, HttpClient httpClient, HttpHeaders headers) { super(headers); - this.allocator = allocator; + //FIXME use Netty allocator + this.allocator = new DefaultDataBufferAllocator(); this.httpMethod = httpMethod; this.uri = uri; this.httpClient = httpClient; @@ -88,7 +92,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { @Override public Mono setBody(Publisher body) { - this.body = Flux.from(body).map(b -> new Buffer(b.asByteBuffer())); + this.body = Flux.from(body).map(this::toByteBuf); return Mono.empty(); } @@ -121,7 +125,14 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { .map(httpChannel -> new ReactorClientHttpResponse(httpChannel, allocator)); } - + private ByteBuf toByteBuf(DataBuffer buffer) { + if (buffer instanceof NettyDataBuffer) { + return ((NettyDataBuffer) buffer).getNativeBuffer(); + } + else { + return Unpooled.wrappedBuffer(buffer.asByteBuffer()); + } + } } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java index 9b3757caf2..8e687132b7 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java @@ -23,6 +23,7 @@ import reactor.io.netty.http.HttpInbound; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferAllocator; +import org.springframework.core.io.buffer.NettyDataBufferAllocator; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseCookie; @@ -50,7 +51,7 @@ public class ReactorClientHttpResponse implements ClientHttpResponse { @Override public Flux getBody() { - return channel.receive().map(b -> allocator.wrap(b.byteBuffer())); + return channel.receiveByteBuffer().map(allocator::wrap); } @Override diff --git a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorHttpClientRequestFactory.java b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorHttpClientRequestFactory.java index ce0cd81812..ad1a9aad42 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorHttpClientRequestFactory.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorHttpClientRequestFactory.java @@ -22,6 +22,7 @@ import reactor.io.netty.http.HttpClient; import org.springframework.core.io.buffer.DataBufferAllocator; import org.springframework.core.io.buffer.DefaultDataBufferAllocator; +import org.springframework.core.io.buffer.NettyDataBufferAllocator; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.util.Assert; @@ -33,20 +34,13 @@ import org.springframework.util.Assert; */ public class ReactorHttpClientRequestFactory implements ClientHttpRequestFactory { - private final DataBufferAllocator allocator; - private final HttpClient httpClient; public ReactorHttpClientRequestFactory() { - this(new DefaultDataBufferAllocator()); + this(reactor.io.netty.http.HttpClient.create()); } - public ReactorHttpClientRequestFactory(DataBufferAllocator allocator) { - this(allocator, reactor.io.netty.http.HttpClient.create()); - } - - protected ReactorHttpClientRequestFactory(DataBufferAllocator allocator, HttpClient httpClient) { - this.allocator = allocator; + protected ReactorHttpClientRequestFactory(HttpClient httpClient) { this.httpClient = httpClient; } @@ -56,7 +50,7 @@ public class ReactorHttpClientRequestFactory implements ClientHttpRequestFactory Assert.notNull(uri, "request URI is required"); Assert.notNull(headers, "request headers are required"); - return new ReactorClientHttpRequest(httpMethod, uri, this.httpClient, headers, this.allocator); + return new ReactorClientHttpRequest(httpMethod, uri, this.httpClient, headers); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java index f6abc3bb16..24358893b4 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java @@ -16,33 +16,32 @@ package org.springframework.http.server.reactive; +import io.netty.buffer.ByteBuf; import reactor.core.publisher.Mono; -import reactor.io.buffer.Buffer; import reactor.io.ipc.ChannelHandler; import reactor.io.netty.http.HttpChannel; -import org.springframework.core.io.buffer.DataBufferAllocator; +import org.springframework.core.io.buffer.NettyDataBufferAllocator; import org.springframework.util.Assert; /** * @author Stephane Maldini */ public class ReactorHttpHandlerAdapter - implements ChannelHandler { + implements ChannelHandler { private final HttpHandler httpHandler; - private final DataBufferAllocator allocator; - - public ReactorHttpHandlerAdapter(HttpHandler httpHandler, - DataBufferAllocator allocator) { + public ReactorHttpHandlerAdapter(HttpHandler httpHandler) { Assert.notNull(httpHandler, "'httpHandler' is required."); this.httpHandler = httpHandler; - this.allocator = allocator; } @Override public Mono apply(HttpChannel channel) { + NettyDataBufferAllocator allocator = + new NettyDataBufferAllocator(channel.delegate().alloc()); + ReactorServerHttpRequest adaptedRequest = new ReactorServerHttpRequest(channel, allocator); ReactorServerHttpResponse adaptedResponse = diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java index 5183fca307..ba52096395 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java @@ -25,6 +25,7 @@ import reactor.io.netty.http.HttpChannel; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferAllocator; +import org.springframework.core.io.buffer.NettyDataBufferAllocator; import org.springframework.http.HttpCookie; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -41,10 +42,10 @@ public class ReactorServerHttpRequest extends AbstractServerHttpRequest { private final HttpChannel channel; - private final DataBufferAllocator allocator; + private final NettyDataBufferAllocator allocator; public ReactorServerHttpRequest(HttpChannel request, - DataBufferAllocator allocator) { + NettyDataBufferAllocator allocator) { Assert.notNull("'request' must not be null"); Assert.notNull(allocator, "'allocator' must not be null"); this.channel = request; @@ -89,10 +90,7 @@ public class ReactorServerHttpRequest extends AbstractServerHttpRequest { @Override public Flux getBody() { - return Flux.from(this.channel.receive()).map(bytes -> { - ByteBuffer byteBuffer = bytes.byteBuffer(); - return allocator.wrap(byteBuffer); - }); + return this.channel.receive().map(allocator::wrap); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java index fbd9f7661a..8eabb39314 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java @@ -16,17 +16,19 @@ package org.springframework.http.server.reactive; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.codec.http.cookie.DefaultCookie; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.io.buffer.Buffer; import reactor.io.netty.http.HttpChannel; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferAllocator; +import org.springframework.core.io.buffer.NettyDataBuffer; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseCookie; import org.springframework.util.Assert; @@ -60,8 +62,7 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse { @Override protected Mono setBodyInternal(Publisher publisher) { - return Mono.from(this.channel.send( - Flux.from(publisher).map(buffer -> new Buffer(buffer.asByteBuffer())))); + return this.channel.send(Flux.from(publisher).map(this::toByteBuf)); } @Override @@ -90,4 +91,12 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse { } } + private ByteBuf toByteBuf(DataBuffer buffer) { + if (buffer instanceof NettyDataBuffer) { + return ((NettyDataBuffer) buffer).getNativeBuffer(); + } + else { + return Unpooled.wrappedBuffer(buffer.asByteBuffer()); + } + } } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/ReactorHttpServer.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/ReactorHttpServer.java index 82b1e08dec..b83e2d0fa0 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/ReactorHttpServer.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/ReactorHttpServer.java @@ -18,10 +18,7 @@ package org.springframework.http.server.reactive.boot; import reactor.core.flow.Loopback; import reactor.core.state.Completable; -import reactor.io.buffer.Buffer; -import org.springframework.core.io.buffer.DataBufferAllocator; -import org.springframework.core.io.buffer.DefaultDataBufferAllocator; import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter; import org.springframework.util.Assert; @@ -35,19 +32,13 @@ public class ReactorHttpServer extends HttpServerSupport private reactor.io.netty.http.HttpServer reactorServer; - private DataBufferAllocator allocator = new DefaultDataBufferAllocator(); - private boolean running; - public void setAllocator(DataBufferAllocator allocator) { - this.allocator = allocator; - } - @Override public void afterPropertiesSet() throws Exception { Assert.notNull(getHttpHandler()); - this.reactorHandler = new ReactorHttpHandlerAdapter(getHttpHandler(), allocator); + this.reactorHandler = new ReactorHttpHandlerAdapter(getHttpHandler()); this.reactorServer = (getPort() != -1 ? reactor.io.netty.http.HttpServer.create(getPort()) : reactor.io.netty.http.HttpServer.create()); diff --git a/spring-web-reactive/src/test/java/org/springframework/web/client/reactive/WebClientIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/client/reactive/WebClientIntegrationTests.java index 5f37a963fb..7557e84082 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/client/reactive/WebClientIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/client/reactive/WebClientIntegrationTests.java @@ -65,7 +65,7 @@ public class WebClientIntegrationTests { .perform(get(baseUrl.toString())) .extract(headers()); - TestSubscriber ts = new TestSubscriber(); + TestSubscriber ts = new TestSubscriber<>(); result.subscribe(ts); ts.awaitAndAssertNextValuesWith( httpHeaders -> { @@ -92,7 +92,7 @@ public class WebClientIntegrationTests { .extract(body(String.class)); - TestSubscriber ts = new TestSubscriber(); + TestSubscriber ts = new TestSubscriber<>(); result.subscribe(ts); ts.awaitAndAssertNextValues("Hello Spring!").assertComplete(); @@ -114,15 +114,12 @@ public class WebClientIntegrationTests { .accept(MediaType.TEXT_PLAIN)) .extract(response(String.class)); - TestSubscriber> ts = new TestSubscriber(); + TestSubscriber> ts = new TestSubscriber<>(); result.subscribe(ts); - ts.awaitAndAssertNextValuesWith(new Consumer>() { - @Override - public void accept(ResponseEntity response) { - assertEquals(200, response.getStatusCode().value()); - assertEquals(MediaType.TEXT_PLAIN, response.getHeaders().getContentType()); - assertEquals("Hello Spring!", response.getBody()); - } + ts.awaitAndAssertNextValuesWith((Consumer>) response -> { + assertEquals(200, response.getStatusCode().value()); + assertEquals(MediaType.TEXT_PLAIN, response.getHeaders().getContentType()); + assertEquals("Hello Spring!", response.getBody()); }); RecordedRequest request = server.takeRequest(); assertEquals(1, server.getRequestCount()); @@ -143,7 +140,7 @@ public class WebClientIntegrationTests { .accept(MediaType.APPLICATION_JSON)) .extract(body(String.class)); - TestSubscriber ts = new TestSubscriber(); + TestSubscriber ts = new TestSubscriber<>(); result.subscribe(ts); ts.awaitAndAssertNextValues(content).assertComplete(); RecordedRequest request = server.takeRequest(); @@ -164,7 +161,7 @@ public class WebClientIntegrationTests { .accept(MediaType.APPLICATION_JSON)) .extract(body(Pojo.class)); - TestSubscriber ts = new TestSubscriber(); + TestSubscriber ts = new TestSubscriber<>(); result.subscribe(ts); ts.awaitAndAssertNextValuesWith(p -> assertEquals("barbar", p.getBar())).assertComplete(); RecordedRequest request = server.takeRequest(); @@ -185,7 +182,7 @@ public class WebClientIntegrationTests { .accept(MediaType.APPLICATION_JSON)) .extract(bodyStream(Pojo.class)); - TestSubscriber ts = new TestSubscriber(); + TestSubscriber ts = new TestSubscriber<>(); result.subscribe(ts); ts.awaitAndAssertNextValuesWith( p -> assertThat(p.getBar(), Matchers.is("bar1")), @@ -209,7 +206,7 @@ public class WebClientIntegrationTests { .accept(MediaType.APPLICATION_JSON)) .extract(responseStream(Pojo.class)); - TestSubscriber>> ts = new TestSubscriber(); + TestSubscriber>> ts = new TestSubscriber<>(); result.subscribe(ts); ts.awaitAndAssertNextValuesWith( response -> { @@ -237,7 +234,7 @@ public class WebClientIntegrationTests { .accept(MediaType.APPLICATION_JSON)) .extract(body(Pojo.class)); - TestSubscriber ts = new TestSubscriber(); + TestSubscriber ts = new TestSubscriber<>(); result.subscribe(ts); ts.awaitAndAssertNextValuesWith(p -> assertEquals("BARBAR", p.getBar())).assertComplete(); @@ -261,7 +258,7 @@ public class WebClientIntegrationTests { .extract(body(String.class)); - TestSubscriber ts = new TestSubscriber(); + TestSubscriber ts = new TestSubscriber<>(); result.subscribe(ts); // TODO: error message should be converted to a ClientException ts.await().assertError();