Use ByteBuf instead of Buffer in reactor-netty

This commit is contained in:
Stephane Maldini 2016-04-18 17:49:32 +01:00
parent ad9d8c28fe
commit 0e5a892bad
8 changed files with 61 additions and 61 deletions

View File

@ -19,15 +19,19 @@ package org.springframework.http.client.reactive;
import java.net.URI;
import java.util.Collection;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.cookie.DefaultCookie;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.io.buffer.Buffer;
import reactor.io.netty.http.HttpClient;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
@ -47,13 +51,13 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
private final HttpClient httpClient;
private Flux<Buffer> body;
private Flux<ByteBuf> body;
public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri, HttpClient httpClient, HttpHeaders headers,
DataBufferAllocator allocator) {
public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri, HttpClient httpClient, HttpHeaders headers) {
super(headers);
this.allocator = allocator;
//FIXME use Netty allocator
this.allocator = new DefaultDataBufferAllocator();
this.httpMethod = httpMethod;
this.uri = uri;
this.httpClient = httpClient;
@ -88,7 +92,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
@Override
public Mono<Void> setBody(Publisher<DataBuffer> body) {
this.body = Flux.from(body).map(b -> new Buffer(b.asByteBuffer()));
this.body = Flux.from(body).map(this::toByteBuf);
return Mono.empty();
}
@ -121,7 +125,14 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
.map(httpChannel -> new ReactorClientHttpResponse(httpChannel, allocator));
}
private ByteBuf toByteBuf(DataBuffer buffer) {
if (buffer instanceof NettyDataBuffer) {
return ((NettyDataBuffer) buffer).getNativeBuffer();
}
else {
return Unpooled.wrappedBuffer(buffer.asByteBuffer());
}
}
}

View File

@ -23,6 +23,7 @@ import reactor.io.netty.http.HttpInbound;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
@ -50,7 +51,7 @@ public class ReactorClientHttpResponse implements ClientHttpResponse {
@Override
public Flux<DataBuffer> getBody() {
return channel.receive().map(b -> allocator.wrap(b.byteBuffer()));
return channel.receiveByteBuffer().map(allocator::wrap);
}
@Override

View File

@ -22,6 +22,7 @@ import reactor.io.netty.http.HttpClient;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.util.Assert;
@ -33,20 +34,13 @@ import org.springframework.util.Assert;
*/
public class ReactorHttpClientRequestFactory implements ClientHttpRequestFactory {
private final DataBufferAllocator allocator;
private final HttpClient httpClient;
public ReactorHttpClientRequestFactory() {
this(new DefaultDataBufferAllocator());
this(reactor.io.netty.http.HttpClient.create());
}
public ReactorHttpClientRequestFactory(DataBufferAllocator allocator) {
this(allocator, reactor.io.netty.http.HttpClient.create());
}
protected ReactorHttpClientRequestFactory(DataBufferAllocator allocator, HttpClient httpClient) {
this.allocator = allocator;
protected ReactorHttpClientRequestFactory(HttpClient httpClient) {
this.httpClient = httpClient;
}
@ -56,7 +50,7 @@ public class ReactorHttpClientRequestFactory implements ClientHttpRequestFactory
Assert.notNull(uri, "request URI is required");
Assert.notNull(headers, "request headers are required");
return new ReactorClientHttpRequest(httpMethod, uri, this.httpClient, headers, this.allocator);
return new ReactorClientHttpRequest(httpMethod, uri, this.httpClient, headers);
}
}

View File

@ -16,33 +16,32 @@
package org.springframework.http.server.reactive;
import io.netty.buffer.ByteBuf;
import reactor.core.publisher.Mono;
import reactor.io.buffer.Buffer;
import reactor.io.ipc.ChannelHandler;
import reactor.io.netty.http.HttpChannel;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
import org.springframework.util.Assert;
/**
* @author Stephane Maldini
*/
public class ReactorHttpHandlerAdapter
implements ChannelHandler<Buffer, Buffer, HttpChannel> {
implements ChannelHandler<ByteBuf, ByteBuf, HttpChannel> {
private final HttpHandler httpHandler;
private final DataBufferAllocator allocator;
public ReactorHttpHandlerAdapter(HttpHandler httpHandler,
DataBufferAllocator allocator) {
public ReactorHttpHandlerAdapter(HttpHandler httpHandler) {
Assert.notNull(httpHandler, "'httpHandler' is required.");
this.httpHandler = httpHandler;
this.allocator = allocator;
}
@Override
public Mono<Void> apply(HttpChannel channel) {
NettyDataBufferAllocator allocator =
new NettyDataBufferAllocator(channel.delegate().alloc());
ReactorServerHttpRequest adaptedRequest =
new ReactorServerHttpRequest(channel, allocator);
ReactorServerHttpResponse adaptedResponse =

View File

@ -25,6 +25,7 @@ import reactor.io.netty.http.HttpChannel;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
@ -41,10 +42,10 @@ public class ReactorServerHttpRequest extends AbstractServerHttpRequest {
private final HttpChannel channel;
private final DataBufferAllocator allocator;
private final NettyDataBufferAllocator allocator;
public ReactorServerHttpRequest(HttpChannel request,
DataBufferAllocator allocator) {
NettyDataBufferAllocator allocator) {
Assert.notNull("'request' must not be null");
Assert.notNull(allocator, "'allocator' must not be null");
this.channel = request;
@ -89,10 +90,7 @@ public class ReactorServerHttpRequest extends AbstractServerHttpRequest {
@Override
public Flux<DataBuffer> getBody() {
return Flux.from(this.channel.receive()).map(bytes -> {
ByteBuffer byteBuffer = bytes.byteBuffer();
return allocator.wrap(byteBuffer);
});
return this.channel.receive().map(allocator::wrap);
}
}

View File

@ -16,17 +16,19 @@
package org.springframework.http.server.reactive;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http.cookie.DefaultCookie;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.io.buffer.Buffer;
import reactor.io.netty.http.HttpChannel;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
import org.springframework.util.Assert;
@ -60,8 +62,7 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse {
@Override
protected Mono<Void> setBodyInternal(Publisher<DataBuffer> publisher) {
return Mono.from(this.channel.send(
Flux.from(publisher).map(buffer -> new Buffer(buffer.asByteBuffer()))));
return this.channel.send(Flux.from(publisher).map(this::toByteBuf));
}
@Override
@ -90,4 +91,12 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse {
}
}
private ByteBuf toByteBuf(DataBuffer buffer) {
if (buffer instanceof NettyDataBuffer) {
return ((NettyDataBuffer) buffer).getNativeBuffer();
}
else {
return Unpooled.wrappedBuffer(buffer.asByteBuffer());
}
}
}

View File

@ -18,10 +18,7 @@ package org.springframework.http.server.reactive.boot;
import reactor.core.flow.Loopback;
import reactor.core.state.Completable;
import reactor.io.buffer.Buffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.util.Assert;
@ -35,19 +32,13 @@ public class ReactorHttpServer extends HttpServerSupport
private reactor.io.netty.http.HttpServer reactorServer;
private DataBufferAllocator allocator = new DefaultDataBufferAllocator();
private boolean running;
public void setAllocator(DataBufferAllocator allocator) {
this.allocator = allocator;
}
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(getHttpHandler());
this.reactorHandler = new ReactorHttpHandlerAdapter(getHttpHandler(), allocator);
this.reactorHandler = new ReactorHttpHandlerAdapter(getHttpHandler());
this.reactorServer = (getPort() != -1 ? reactor.io.netty.http.HttpServer.create(getPort()) :
reactor.io.netty.http.HttpServer.create());

View File

@ -65,7 +65,7 @@ public class WebClientIntegrationTests {
.perform(get(baseUrl.toString()))
.extract(headers());
TestSubscriber<HttpHeaders> ts = new TestSubscriber();
TestSubscriber<HttpHeaders> ts = new TestSubscriber<>();
result.subscribe(ts);
ts.awaitAndAssertNextValuesWith(
httpHeaders -> {
@ -92,7 +92,7 @@ public class WebClientIntegrationTests {
.extract(body(String.class));
TestSubscriber<String> ts = new TestSubscriber();
TestSubscriber<String> ts = new TestSubscriber<>();
result.subscribe(ts);
ts.awaitAndAssertNextValues("Hello Spring!").assertComplete();
@ -114,15 +114,12 @@ public class WebClientIntegrationTests {
.accept(MediaType.TEXT_PLAIN))
.extract(response(String.class));
TestSubscriber<ResponseEntity<String>> ts = new TestSubscriber();
TestSubscriber<ResponseEntity<String>> ts = new TestSubscriber<>();
result.subscribe(ts);
ts.awaitAndAssertNextValuesWith(new Consumer<ResponseEntity<String>>() {
@Override
public void accept(ResponseEntity<String> response) {
assertEquals(200, response.getStatusCode().value());
assertEquals(MediaType.TEXT_PLAIN, response.getHeaders().getContentType());
assertEquals("Hello Spring!", response.getBody());
}
ts.awaitAndAssertNextValuesWith((Consumer<ResponseEntity<String>>) response -> {
assertEquals(200, response.getStatusCode().value());
assertEquals(MediaType.TEXT_PLAIN, response.getHeaders().getContentType());
assertEquals("Hello Spring!", response.getBody());
});
RecordedRequest request = server.takeRequest();
assertEquals(1, server.getRequestCount());
@ -143,7 +140,7 @@ public class WebClientIntegrationTests {
.accept(MediaType.APPLICATION_JSON))
.extract(body(String.class));
TestSubscriber<String> ts = new TestSubscriber();
TestSubscriber<String> ts = new TestSubscriber<>();
result.subscribe(ts);
ts.awaitAndAssertNextValues(content).assertComplete();
RecordedRequest request = server.takeRequest();
@ -164,7 +161,7 @@ public class WebClientIntegrationTests {
.accept(MediaType.APPLICATION_JSON))
.extract(body(Pojo.class));
TestSubscriber<Pojo> ts = new TestSubscriber();
TestSubscriber<Pojo> ts = new TestSubscriber<>();
result.subscribe(ts);
ts.awaitAndAssertNextValuesWith(p -> assertEquals("barbar", p.getBar())).assertComplete();
RecordedRequest request = server.takeRequest();
@ -185,7 +182,7 @@ public class WebClientIntegrationTests {
.accept(MediaType.APPLICATION_JSON))
.extract(bodyStream(Pojo.class));
TestSubscriber<Pojo> ts = new TestSubscriber();
TestSubscriber<Pojo> ts = new TestSubscriber<>();
result.subscribe(ts);
ts.awaitAndAssertNextValuesWith(
p -> assertThat(p.getBar(), Matchers.is("bar1")),
@ -209,7 +206,7 @@ public class WebClientIntegrationTests {
.accept(MediaType.APPLICATION_JSON))
.extract(responseStream(Pojo.class));
TestSubscriber<ResponseEntity<Flux<Pojo>>> ts = new TestSubscriber();
TestSubscriber<ResponseEntity<Flux<Pojo>>> ts = new TestSubscriber<>();
result.subscribe(ts);
ts.awaitAndAssertNextValuesWith(
response -> {
@ -237,7 +234,7 @@ public class WebClientIntegrationTests {
.accept(MediaType.APPLICATION_JSON))
.extract(body(Pojo.class));
TestSubscriber<Pojo> ts = new TestSubscriber();
TestSubscriber<Pojo> ts = new TestSubscriber<>();
result.subscribe(ts);
ts.awaitAndAssertNextValuesWith(p -> assertEquals("BARBAR", p.getBar())).assertComplete();
@ -261,7 +258,7 @@ public class WebClientIntegrationTests {
.extract(body(String.class));
TestSubscriber<String> ts = new TestSubscriber();
TestSubscriber<String> ts = new TestSubscriber<>();
result.subscribe(ts);
// TODO: error message should be converted to a ClientException
ts.await().assertError();