Added DataBufferAllocator property to ReactiveHttpOutputMessage, and use that to pass on to Encoder.encode().

This commit is contained in:
Arjen Poutsma 2016-03-18 16:14:33 +01:00
parent 1836b2825b
commit 7f786ce4d7
20 changed files with 109 additions and 43 deletions

View File

@ -22,6 +22,7 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
/**
* A "reactive" HTTP output message that accepts output as a {@link Publisher}.
@ -48,4 +49,11 @@ public interface ReactiveHttpOutputMessage extends HttpMessage {
*/
Mono<Void> setBody(Publisher<DataBuffer> body);
/**
* Returns a {@link DataBufferAllocator} that can be used for creating the body.
* @return a buffer allocator
* @see #setBody(Publisher)
*/
DataBufferAllocator allocator();
}

View File

@ -61,6 +61,11 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
this.httpClient = httpClient;
}
@Override
public DataBufferAllocator allocator() {
return this.allocator;
}
@Override
public HttpMethod getMethod() {
return this.httpMethod;

View File

@ -17,7 +17,6 @@
package org.springframework.http.client.reactive;
import java.net.URI;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@ -32,6 +31,7 @@ import reactor.core.publisher.Mono;
import rx.Observable;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
@ -60,6 +60,11 @@ public class RxNettyClientHttpRequest extends AbstractClientHttpRequest {
this.allocator = allocator;
}
@Override
public DataBufferAllocator allocator() {
return this.allocator;
}
/**
* Set the body of the message to the given {@link Publisher}.
*

View File

@ -13,17 +13,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseCookie;
import org.springframework.util.Assert;
@ -39,32 +41,48 @@ import org.springframework.util.MultiValueMap;
*/
public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
private static final int STATE_NEW = 1;
private static final int STATE_COMMITTING = 2;
private static final int STATE_COMMITTED = 3;
private final HttpHeaders headers;
private final MultiValueMap<String, ResponseCookie> cookies;
private AtomicReference<State> state = new AtomicReference<>(State.NEW);
private final AtomicInteger state = new AtomicInteger(STATE_NEW);
private final List<Supplier<? extends Mono<Void>>> beforeCommitActions = new ArrayList<>(4);
private final DataBufferAllocator allocator;
protected AbstractServerHttpResponse() {
public AbstractServerHttpResponse(DataBufferAllocator allocator) {
Assert.notNull(allocator, "'allocator' must not be null");
this.allocator = allocator;
this.headers = new HttpHeaders();
this.cookies = new LinkedMultiValueMap<String, ResponseCookie>();
}
@Override
public final DataBufferAllocator allocator() {
return this.allocator;
}
@Override
public HttpHeaders getHeaders() {
if (State.COMITTED.equals(this.state.get())) {
if (STATE_COMMITTED == this.state.get()) {
return HttpHeaders.readOnlyHttpHeaders(this.headers);
}
return this.headers;
else {
return this.headers;
}
}
@Override
public MultiValueMap<String, ResponseCookie> getCookies() {
if (State.COMITTED.equals(this.state.get())) {
if (STATE_COMMITTED == this.state.get()) {
return CollectionUtils.unmodifiableMultiValueMap(this.cookies);
}
return this.cookies;
@ -78,16 +96,16 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
private Mono<Void> applyBeforeCommit() {
Mono<Void> mono = Mono.empty();
if (this.state.compareAndSet(State.NEW, State.COMMITTING)) {
if (this.state.compareAndSet(STATE_NEW, STATE_COMMITTING)) {
for (Supplier<? extends Mono<Void>> action : this.beforeCommitActions) {
mono = mono.after(() -> action.get());
mono = mono.after(action);
}
mono = mono.otherwise(ex -> {
// Ignore errors from beforeCommit actions
return Mono.empty();
});
mono = mono.after(() -> {
this.state.set(State.COMITTED);
this.state.set(STATE_COMMITTED);
writeHeaders();
writeCookies();
return Mono.empty();
@ -125,7 +143,4 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
return applyBeforeCommit();
}
private enum State { NEW, COMMITTING, COMITTED }
}

View File

@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import reactor.core.publisher.Mono;
@ -44,7 +45,8 @@ public class ReactorHttpHandlerAdapter
public Mono<Void> apply(HttpChannel<Buffer, Buffer> channel) {
ReactorServerHttpRequest adaptedRequest =
new ReactorServerHttpRequest(channel, allocator);
ReactorServerHttpResponse adaptedResponse = new ReactorServerHttpResponse(channel);
ReactorServerHttpResponse adaptedResponse =
new ReactorServerHttpResponse(channel, allocator);
return this.httpHandler.handle(adaptedRequest, adaptedResponse);
}

View File

@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.time.Duration;
@ -27,6 +28,7 @@ import reactor.io.netty.http.model.Cookie;
import reactor.io.netty.http.model.Status;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
import org.springframework.util.Assert;
@ -41,8 +43,9 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse {
private final HttpChannel<?, Buffer> channel;
public ReactorServerHttpResponse(HttpChannel<?, Buffer> response) {
public ReactorServerHttpResponse(HttpChannel<?, Buffer> response,
DataBufferAllocator allocator) {
super(allocator);
Assert.notNull("'response' must not be null.");
this.channel = response;
}

View File

@ -46,7 +46,8 @@ public class RxNettyHttpHandlerAdapter implements RequestHandler<ByteBuf, ByteBu
RxNettyServerHttpRequest adaptedRequest =
new RxNettyServerHttpRequest(request, allocator);
RxNettyServerHttpResponse adaptedResponse = new RxNettyServerHttpResponse(response);
RxNettyServerHttpResponse adaptedResponse =
new RxNettyServerHttpResponse(response, allocator);
Publisher<Void> result = this.httpHandler.handle(adaptedRequest, adaptedResponse);
return RxJava1ObservableConverter.from(result);
}

View File

@ -29,6 +29,7 @@ import rx.Observable;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
import org.springframework.util.Assert;
@ -43,8 +44,11 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse {
private final HttpServerResponse<ByteBuf> response;
public RxNettyServerHttpResponse(HttpServerResponse<ByteBuf> response) {
public RxNettyServerHttpResponse(HttpServerResponse<ByteBuf> response,
NettyDataBufferAllocator allocator) {
super(allocator);
Assert.notNull("'response', response must not be null.");
this.response = response;
}

View File

@ -82,7 +82,8 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
this.bufferSize);
ServletServerHttpResponse response =
new ServletServerHttpResponse(synchronizer, this.bufferSize);
new ServletServerHttpResponse(synchronizer, this.bufferSize,
this.allocator);
HandlerResultSubscriber resultSubscriber =
new HandlerResultSubscriber(synchronizer);

View File

@ -34,6 +34,7 @@ import org.reactivestreams.Subscription;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseCookie;
@ -53,7 +54,8 @@ public class ServletServerHttpResponse extends AbstractServerHttpResponse {
private final ResponseBodySubscriber responseBodySubscriber;
public ServletServerHttpResponse(ServletAsyncContextSynchronizer synchronizer,
int bufferSize) throws IOException {
int bufferSize, DataBufferAllocator allocator) throws IOException {
super(allocator);
Assert.notNull(synchronizer, "'synchronizer' must not be null");
this.response = (HttpServletResponse) synchronizer.getResponse();
@ -62,7 +64,6 @@ public class ServletServerHttpResponse extends AbstractServerHttpResponse {
this.response.getOutputStream().setWriteListener(responseBodySubscriber);
}
public HttpServletResponse getServletResponse() {
return this.response;
}

View File

@ -57,6 +57,7 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
private final HttpHandler delegate;
// TODO: use UndertowDBA when introduced
private final DataBufferAllocator allocator;
public UndertowHttpHandlerAdapter(HttpHandler delegate,
@ -76,7 +77,9 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
ResponseBodySubscriber responseBodySubscriber = new ResponseBodySubscriber(exchange);
ServerHttpResponse response = new UndertowServerHttpResponse(exchange,
publisher -> Mono.from(subscriber -> publisher.subscribe(responseBodySubscriber)));
publisher -> Mono
.from(subscriber -> publisher.subscribe(responseBodySubscriber)),
allocator);
exchange.dispatch();

View File

@ -28,6 +28,7 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
import org.springframework.util.Assert;
@ -44,10 +45,10 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse {
private final Function<Publisher<DataBuffer>, Mono<Void>> responseBodyWriter;
public UndertowServerHttpResponse(HttpServerExchange exchange,
Function<Publisher<DataBuffer>, Mono<Void>> responseBodyWriter) {
Function<Publisher<DataBuffer>, Mono<Void>> responseBodyWriter,
DataBufferAllocator allocator) {
super(allocator);
Assert.notNull(exchange, "'exchange' is required.");
Assert.notNull(responseBodyWriter, "'responseBodyWriter' must not be null");
this.exchange = exchange;

View File

@ -30,6 +30,7 @@ import reactor.core.publisher.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Encoder;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
@ -146,7 +147,10 @@ public class DefaultHttpRequestBuilder implements HttpRequestBuilder {
Optional<Encoder<?>> messageEncoder = resolveEncoder(requestBodyType, mediaType);
if (messageEncoder.isPresent()) {
request.setBody(messageEncoder.get().encode(this.contentPublisher, requestBodyType, mediaType));
DataBufferAllocator allocator = request.allocator();
request.setBody(messageEncoder.get()
.encode(this.contentPublisher, allocator, requestBodyType,
mediaType));
}
else {
// TODO: wrap with client exception?

View File

@ -86,8 +86,8 @@ public final class WebClient {
public WebClient(ClientHttpRequestFactory requestFactory) {
this.requestFactory = requestFactory;
DataBufferAllocator allocator = new DefaultDataBufferAllocator();
this.messageEncoders = Arrays.asList(new ByteBufferEncoder(allocator), new StringEncoder(allocator),
new JacksonJsonEncoder(allocator));
this.messageEncoders = Arrays.asList(new ByteBufferEncoder(), new StringEncoder(),
new JacksonJsonEncoder());
this.messageDecoders = Arrays.asList(new ByteBufferDecoder(), new StringDecoder(allocator),
new JacksonJsonDecoder(new JsonObjectDecoder()));
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -36,6 +36,7 @@ import org.springframework.core.ResolvableType;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.codec.Encoder;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
@ -192,7 +193,10 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered
if (encoder != null) {
ServerHttpResponse response = exchange.getResponse();
response.getHeaders().setContentType(selectedMediaType);
return response.setBody(encoder.encode((Publisher) publisher, elementType, selectedMediaType));
DataBufferAllocator allocator = response.allocator();
return response.setBody(
encoder.encode((Publisher) publisher, allocator, elementType,
selectedMediaType));
}
}

View File

@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.util.function.Supplier;
@ -22,6 +23,8 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
@ -41,6 +44,8 @@ public class MockServerHttpResponse implements ServerHttpResponse {
private Publisher<DataBuffer> body;
private DataBufferAllocator allocator = new DefaultDataBufferAllocator();
@Override
public void setStatusCode(HttpStatus status) {
@ -80,4 +85,9 @@ public class MockServerHttpResponse implements ServerHttpResponse {
return Mono.empty();
}
@Override
public DataBufferAllocator allocator() {
return this.allocator;
}
}

View File

@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.nio.ByteBuffer;
@ -31,10 +32,7 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.*;
/**
* @author Rossen Stoyanchev
@ -147,6 +145,9 @@ public class ServerHttpResponseTests {
private final List<DataBuffer> content = new ArrayList<>();
public TestServerHttpResponse() {
super(new DefaultDataBufferAllocator());
}
@Override
public void setStatusCode(HttpStatus status) {

View File

@ -232,8 +232,7 @@ public class DispatcherHandlerErrorTests {
@Bean
public ResponseBodyResultHandler resultHandler() {
List<Encoder<?>> encoders = Collections
.singletonList(new StringEncoder(new DefaultDataBufferAllocator()));
List<Encoder<?>> encoders = Collections.singletonList(new StringEncoder());
return new ResponseBodyResultHandler(encoders, new DefaultConversionService());
}

View File

@ -379,9 +379,8 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
@Bean
public ResponseBodyResultHandler responseBodyResultHandler() {
List<Encoder<?>> encoders = Arrays.asList(
new ByteBufferEncoder(this.allocator), new StringEncoder(this.allocator),
new JacksonJsonEncoder(this.allocator, new JsonObjectEncoder(this.allocator)));
List<Encoder<?>> encoders = Arrays.asList(new ByteBufferEncoder(),
new StringEncoder(), new JacksonJsonEncoder(new JsonObjectEncoder()));
ResponseBodyResultHandler resultHandler = new ResponseBodyResultHandler(encoders, conversionService());
resultHandler.setOrder(1);
return resultHandler;
@ -458,8 +457,9 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
@RequestMapping("/raw")
public Publisher<ByteBuffer> rawResponseBody() {
JacksonJsonEncoder encoder = new JacksonJsonEncoder(new DefaultDataBufferAllocator());
return encoder.encode(Mono.just(new Person("Robert")),
DataBufferAllocator allocator = new DefaultDataBufferAllocator();
JacksonJsonEncoder encoder = new JacksonJsonEncoder();
return encoder.encode(Mono.just(new Person("Robert")), allocator,
ResolvableType.forClass(Person.class), MediaType.APPLICATION_JSON).map(DataBuffer::asByteBuffer);
}

View File

@ -24,7 +24,6 @@ import org.reactivestreams.Publisher;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.support.StringEncoder;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
import org.springframework.ui.ExtendedModelMap;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.method.HandlerMethod;
@ -42,7 +41,7 @@ public class ResponseBodyResultHandlerTests {
@Test
public void supports() throws NoSuchMethodException {
ResponseBodyResultHandler handler = new ResponseBodyResultHandler(Collections.singletonList(
new StringEncoder(new DefaultDataBufferAllocator())),
new StringEncoder()),
new DefaultConversionService());
TestController controller = new TestController();