Updated http and web packages to use DataBuffer

This commit is contained in:
Arjen Poutsma 2016-01-21 10:40:50 +01:00
parent 2981b5e6e8
commit 225179bc6f
29 changed files with 303 additions and 193 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,11 +16,11 @@
package org.springframework.http;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import org.springframework.core.io.buffer.DataBuffer;
/**
* An "reactive" HTTP input message that exposes the input as {@link Publisher}.
*
@ -35,6 +35,6 @@ public interface ReactiveHttpInputMessage extends HttpMessage {
* Return the body of the message as a {@link Publisher}.
* @return the body content publisher
*/
Flux<ByteBuffer> getBody();
Flux<DataBuffer> getBody();
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,11 +16,11 @@
package org.springframework.http;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
/**
* A "reactive" HTTP output message that accepts output as a {@link Publisher}.
*
@ -38,6 +38,6 @@ public interface ReactiveHttpOutputMessage extends HttpMessage {
* @param body the body content publisher
* @return a publisher that indicates completion or error.
*/
Mono<Void> setBody(Publisher<ByteBuffer> body);
Mono<Void> setBody(Publisher<DataBuffer> body);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -15,12 +15,11 @@
*/
package org.springframework.http.server.reactive;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
/**
@ -46,7 +45,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
}
@Override
public Mono<Void> setBody(Publisher<ByteBuffer> publisher) {
public Mono<Void> setBody(Publisher<DataBuffer> publisher) {
return Flux.from(publisher).lift(new WriteWithOperator<>(writeWithPublisher -> {
writeHeaders();
return setBodyInternal(writeWithPublisher);
@ -57,7 +56,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
* Implement this method to write to the underlying the response.
* @param publisher the publisher to write with
*/
protected abstract Mono<Void> setBodyInternal(Publisher<ByteBuffer> publisher);
protected abstract Mono<Void> setBodyInternal(Publisher<DataBuffer> publisher);
@Override
public void writeHeaders() {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -20,6 +20,7 @@ import reactor.io.buffer.Buffer;
import reactor.io.net.ReactiveChannelHandler;
import reactor.io.net.http.HttpChannel;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.util.Assert;
/**
@ -30,15 +31,19 @@ public class ReactorHttpHandlerAdapter
private final HttpHandler httpHandler;
private final DataBufferAllocator allocator;
public ReactorHttpHandlerAdapter(HttpHandler httpHandler) {
public ReactorHttpHandlerAdapter(HttpHandler httpHandler,
DataBufferAllocator allocator) {
Assert.notNull(httpHandler, "'httpHandler' is required.");
this.httpHandler = httpHandler;
this.allocator = allocator;
}
@Override
public Mono<Void> apply(HttpChannel<Buffer, Buffer> channel) {
ReactorServerHttpRequest adaptedRequest = new ReactorServerHttpRequest(channel);
ReactorServerHttpRequest adaptedRequest =
new ReactorServerHttpRequest(channel, allocator);
ReactorServerHttpResponse adaptedResponse = new ReactorServerHttpResponse(channel);
return this.httpHandler.handle(adaptedRequest, adaptedResponse);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -27,6 +27,8 @@ import reactor.io.buffer.Buffer;
import reactor.io.net.http.HttpChannel;
import reactor.io.net.http.model.Cookie;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
@ -41,10 +43,14 @@ public class ReactorServerHttpRequest extends AbstractServerHttpRequest {
private final HttpChannel<Buffer, ?> channel;
private final DataBufferAllocator allocator;
public ReactorServerHttpRequest(HttpChannel<Buffer, ?> request) {
Assert.notNull("'request' must not be null.");
public ReactorServerHttpRequest(HttpChannel<Buffer, ?> request,
DataBufferAllocator allocator) {
Assert.notNull("'request' must not be null");
Assert.notNull(allocator, "'allocator' must not be null");
this.channel = request;
this.allocator = allocator;
}
@ -84,8 +90,11 @@ public class ReactorServerHttpRequest extends AbstractServerHttpRequest {
}
@Override
public Flux<ByteBuffer> getBody() {
return Flux.from(this.channel.input()).map(Buffer::byteBuffer);
public Flux<DataBuffer> getBody() {
return Flux.from(this.channel.input()).map(bytes -> {
ByteBuffer byteBuffer = bytes.byteBuffer();
return allocator.wrap(byteBuffer);
});
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -15,8 +15,6 @@
*/
package org.springframework.http.server.reactive;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -25,6 +23,7 @@ import reactor.io.net.http.HttpChannel;
import reactor.io.net.http.model.Cookie;
import reactor.io.net.http.model.Status;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpStatus;
import org.springframework.util.Assert;
@ -56,8 +55,9 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse {
}
@Override
protected Mono<Void> setBodyInternal(Publisher<ByteBuffer> publisher) {
return Mono.from(this.channel.writeWith(Flux.from(publisher).map(Buffer::new)));
protected Mono<Void> setBodyInternal(Publisher<DataBuffer> publisher) {
return Mono.from(this.channel.writeWith(
Flux.from(publisher).map(buffer -> new Buffer(buffer.asByteBuffer()))));
}
@Override

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -24,6 +24,7 @@ import org.reactivestreams.Publisher;
import reactor.core.converter.RxJava1ObservableConverter;
import rx.Observable;
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
import org.springframework.util.Assert;
/**
@ -33,15 +34,20 @@ public class RxNettyHttpHandlerAdapter implements RequestHandler<ByteBuf, ByteBu
private final HttpHandler httpHandler;
private final NettyDataBufferAllocator allocator;
public RxNettyHttpHandlerAdapter(HttpHandler httpHandler) {
Assert.notNull(httpHandler, "'httpHandler' is required.");
public RxNettyHttpHandlerAdapter(HttpHandler httpHandler,
NettyDataBufferAllocator allocator) {
Assert.notNull(httpHandler, "'httpHandler' is required");
Assert.notNull(allocator, "'allocator' must not be null");
this.httpHandler = httpHandler;
this.allocator = allocator;
}
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
RxNettyServerHttpRequest adaptedRequest = new RxNettyServerHttpRequest(request);
RxNettyServerHttpRequest adaptedRequest =
new RxNettyServerHttpRequest(request, allocator);
RxNettyServerHttpResponse adaptedResponse = new RxNettyServerHttpResponse(response);
Publisher<Void> result = this.httpHandler.handle(adaptedRequest, adaptedResponse);
return RxJava1ObservableConverter.from(result);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,7 +18,6 @@ package org.springframework.http.server.reactive;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -30,6 +29,8 @@ import reactor.core.converter.RxJava1ObservableConverter;
import reactor.core.publisher.Flux;
import rx.Observable;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
@ -45,9 +46,13 @@ public class RxNettyServerHttpRequest extends AbstractServerHttpRequest {
private final HttpServerRequest<ByteBuf> request;
private final NettyDataBufferAllocator allocator;
public RxNettyServerHttpRequest(HttpServerRequest<ByteBuf> request) {
Assert.notNull("'request', request must not be null.");
public RxNettyServerHttpRequest(HttpServerRequest<ByteBuf> request,
NettyDataBufferAllocator allocator) {
Assert.notNull("'request', request must not be null");
Assert.notNull(allocator, "'allocator' must not be null");
this.allocator = allocator;
this.request = request;
}
@ -88,8 +93,8 @@ public class RxNettyServerHttpRequest extends AbstractServerHttpRequest {
}
@Override
public Flux<ByteBuffer> getBody() {
Observable<ByteBuffer> content = this.request.getContent().map(ByteBuf::nioBuffer);
public Flux<DataBuffer> getBody() {
Observable<DataBuffer> content = this.request.getContent().map(allocator::wrap);
content = content.concatWith(Observable.empty()); // See GH issue #58
return RxJava1ObservableConverter.from(content);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,8 +16,6 @@
package org.springframework.http.server.reactive;
import java.nio.ByteBuffer;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http.cookie.DefaultCookie;
@ -27,6 +25,7 @@ import reactor.core.converter.RxJava1ObservableConverter;
import reactor.core.publisher.Mono;
import rx.Observable;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpStatus;
import org.springframework.util.Assert;
@ -58,15 +57,15 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse {
}
@Override
protected Mono<Void> setBodyInternal(Publisher<ByteBuffer> publisher) {
protected Mono<Void> setBodyInternal(Publisher<DataBuffer> publisher) {
Observable<byte[]> content = RxJava1ObservableConverter.from(publisher).map(this::toBytes);
Observable<Void> completion = this.response.writeBytes(content);
return RxJava1ObservableConverter.from(completion).after();
}
private byte[] toBytes(ByteBuffer buffer) {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
private byte[] toBytes(DataBuffer buffer) {
byte[] bytes = new byte[buffer.readableByteCount()];
buffer.read(bytes);
return bytes;
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,8 +17,6 @@
package org.springframework.http.server.reactive;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
@ -38,6 +36,9 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
import org.springframework.http.HttpStatus;
import org.springframework.util.Assert;
@ -55,11 +56,16 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
private HttpHandler handler;
private DataBufferAllocator allocator = new DefaultDataBufferAllocator();
public void setHandler(HttpHandler handler) {
this.handler = handler;
}
public void setAllocator(DataBufferAllocator allocator) {
this.allocator = allocator;
}
@Override
protected void service(HttpServletRequest servletRequest, HttpServletResponse servletResponse)
@ -68,11 +74,13 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
AsyncContext context = servletRequest.startAsync();
ServletAsyncContextSynchronizer synchronizer = new ServletAsyncContextSynchronizer(context);
RequestBodyPublisher requestBody = new RequestBodyPublisher(synchronizer, BUFFER_SIZE);
RequestBodyPublisher requestBody =
new RequestBodyPublisher(synchronizer, allocator, BUFFER_SIZE);
ServletServerHttpRequest request = new ServletServerHttpRequest(servletRequest, requestBody);
servletRequest.getInputStream().setReadListener(requestBody);
ResponseBodySubscriber responseBodySubscriber = new ResponseBodySubscriber(synchronizer);
ResponseBodySubscriber responseBodySubscriber =
new ResponseBodySubscriber(synchronizer, allocator);
ServletServerHttpResponse response = new ServletServerHttpResponse(servletResponse,
publisher -> Mono.from(subscriber -> publisher.subscribe(responseBodySubscriber)));
servletResponse.getOutputStream().setWriteListener(responseBodySubscriber);
@ -81,30 +89,32 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
this.handler.handle(request, response).subscribe(resultSubscriber);
}
private static class RequestBodyPublisher implements ReadListener, Publisher<ByteBuffer> {
private static class RequestBodyPublisher
implements ReadListener, Publisher<DataBuffer> {
private final ServletAsyncContextSynchronizer synchronizer;
private final DataBufferAllocator allocator;
private final byte[] buffer;
private final DemandCounter demand = new DemandCounter();
private Subscriber<? super ByteBuffer> subscriber;
private Subscriber<? super DataBuffer> subscriber;
private boolean stalled;
private boolean cancelled;
public RequestBodyPublisher(ServletAsyncContextSynchronizer synchronizer, int bufferSize) {
public RequestBodyPublisher(ServletAsyncContextSynchronizer synchronizer,
DataBufferAllocator allocator, int bufferSize) {
this.synchronizer = synchronizer;
this.allocator = allocator;
this.buffer = new byte[bufferSize];
}
@Override
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
public void subscribe(Subscriber<? super DataBuffer> subscriber) {
if (subscriber == null) {
throw new NullPointerException();
}
@ -146,11 +156,11 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
}
else if (read > 0) {
this.demand.decrement();
byte[] copy = Arrays.copyOf(this.buffer, read);
// logger.debug("Next: " + new String(copy, UTF_8));
DataBuffer dataBuffer = allocator.allocateBuffer(read);
dataBuffer.write(this.buffer, 0, read);
this.subscriber.onNext(ByteBuffer.wrap(copy));
this.subscriber.onNext(dataBuffer);
}
}
@ -265,19 +275,23 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
}
}
private static class ResponseBodySubscriber implements WriteListener, Subscriber<ByteBuffer> {
private static class ResponseBodySubscriber
implements WriteListener, Subscriber<DataBuffer> {
private final ServletAsyncContextSynchronizer synchronizer;
private final DataBufferAllocator allocator;
private Subscription subscription;
private ByteBuffer buffer;
private DataBuffer buffer;
private volatile boolean subscriberComplete = false;
public ResponseBodySubscriber(ServletAsyncContextSynchronizer synchronizer) {
public ResponseBodySubscriber(ServletAsyncContextSynchronizer synchronizer,
DataBufferAllocator allocator) {
this.synchronizer = synchronizer;
this.allocator = allocator;
}
@ -288,8 +302,7 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
}
@Override
public void onNext(ByteBuffer bytes) {
public void onNext(DataBuffer bytes) {
Assert.isNull(buffer);
this.buffer = bytes;
@ -321,8 +334,8 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
if (ready) {
if (this.buffer != null) {
byte[] bytes = new byte[this.buffer.remaining()];
this.buffer.get(bytes);
byte[] bytes = new byte[this.buffer.readableByteCount()];
this.buffer.read(bytes);
this.buffer = null;
output.write(bytes);
if (!subscriberComplete) {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,7 +18,6 @@ package org.springframework.http.server.reactive;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Enumeration;
@ -30,6 +29,7 @@ import javax.servlet.http.HttpServletRequest;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
@ -47,10 +47,10 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest {
private final HttpServletRequest request;
private final Flux<ByteBuffer> requestBodyPublisher;
private final Flux<DataBuffer> requestBodyPublisher;
public ServletServerHttpRequest(HttpServletRequest request, Publisher<ByteBuffer> body) {
public ServletServerHttpRequest(HttpServletRequest request,
Publisher<DataBuffer> body) {
Assert.notNull(request, "'request' must not be null.");
Assert.notNull(body, "'body' must not be null.");
this.request = request;
@ -125,7 +125,7 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest {
}
@Override
public Flux<ByteBuffer> getBody() {
public Flux<DataBuffer> getBody() {
return this.requestBodyPublisher;
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,7 +16,6 @@
package org.springframework.http.server.reactive;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
@ -27,6 +26,7 @@ import javax.servlet.http.HttpServletResponse;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
@ -41,11 +41,11 @@ public class ServletServerHttpResponse extends AbstractServerHttpResponse {
private final HttpServletResponse response;
private final Function<Publisher<ByteBuffer>, Mono<Void>> responseBodyWriter;
private final Function<Publisher<DataBuffer>, Mono<Void>> responseBodyWriter;
public ServletServerHttpResponse(HttpServletResponse response,
Function<Publisher<ByteBuffer>, Mono<Void>> responseBodyWriter) {
Function<Publisher<DataBuffer>, Mono<Void>> responseBodyWriter) {
Assert.notNull(response, "'response' must not be null");
Assert.notNull(responseBodyWriter, "'responseBodyWriter' must not be null");
@ -64,7 +64,7 @@ public class ServletServerHttpResponse extends AbstractServerHttpResponse {
}
@Override
protected Mono<Void> setBodyInternal(Publisher<ByteBuffer> publisher) {
protected Mono<Void> setBodyInternal(Publisher<DataBuffer> publisher) {
return this.responseBodyWriter.apply(publisher);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -33,6 +33,8 @@ import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.xnio.ChannelListener;
import org.xnio.ChannelListeners;
import org.xnio.IoUtils;
import org.xnio.channels.StreamSinkChannel;
import org.xnio.channels.StreamSourceChannel;
import reactor.core.publisher.Mono;
@ -40,13 +42,10 @@ import reactor.core.subscriber.BaseSubscriber;
import reactor.core.util.BackpressureUtils;
import reactor.core.util.Exceptions;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.util.Assert;
import static org.xnio.ChannelListeners.closingChannelExceptionHandler;
import static org.xnio.ChannelListeners.flushingChannelListener;
import static org.xnio.IoUtils.safeClose;
/**
* @author Marek Hawrylczak
* @author Rossen Stoyanchev
@ -58,17 +57,21 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
private final HttpHandler delegate;
private final DataBufferAllocator allocator;
public UndertowHttpHandlerAdapter(HttpHandler delegate) {
Assert.notNull(delegate, "'delegate' is required.");
public UndertowHttpHandlerAdapter(HttpHandler delegate,
DataBufferAllocator allocator) {
Assert.notNull(delegate, "'delegate' is required");
Assert.notNull(allocator, "'allocator' must not be null");
this.delegate = delegate;
this.allocator = allocator;
}
@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
RequestBodyPublisher requestBody = new RequestBodyPublisher(exchange);
RequestBodyPublisher requestBody = new RequestBodyPublisher(exchange, allocator);
ServerHttpRequest request = new UndertowServerHttpRequest(exchange, requestBody);
ResponseBodySubscriber responseBodySubscriber = new ResponseBodySubscriber(exchange);
@ -107,8 +110,7 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
});
}
private static class RequestBodyPublisher implements Publisher<ByteBuffer> {
private static class RequestBodyPublisher implements Publisher<DataBuffer> {
private static final AtomicLongFieldUpdater<RequestBodySubscription> DEMAND =
AtomicLongFieldUpdater.newUpdater(RequestBodySubscription.class, "demand");
@ -116,16 +118,18 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
private final HttpServerExchange exchange;
private Subscriber<? super ByteBuffer> subscriber;
private final DataBufferAllocator allocator;
private Subscriber<? super DataBuffer> subscriber;
public RequestBodyPublisher(HttpServerExchange exchange) {
public RequestBodyPublisher(HttpServerExchange exchange,
DataBufferAllocator allocator) {
this.exchange = exchange;
this.allocator = allocator;
}
@Override
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
public void subscribe(Subscriber<? super DataBuffer> subscriber) {
if (subscriber == null) {
throw Exceptions.spec_2_13_exception();
}
@ -175,11 +179,11 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
private void close() {
if (this.pooledBuffer != null) {
safeClose(this.pooledBuffer);
IoUtils.safeClose(this.pooledBuffer);
this.pooledBuffer = null;
}
if (this.channel != null) {
safeClose(this.channel);
IoUtils.safeClose(this.channel);
this.channel = null;
}
}
@ -251,7 +255,8 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
private void doOnNext(ByteBuffer buffer) {
this.draining = false;
buffer.flip();
subscriber.onNext(buffer);
DataBuffer dataBuffer = allocator.wrap(buffer);
subscriber.onNext(dataBuffer);
}
private void doOnComplete() {
@ -315,7 +320,7 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
}
}
private static class ResponseBodySubscriber extends BaseSubscriber<ByteBuffer>
private static class ResponseBodySubscriber extends BaseSubscriber<DataBuffer>
implements ChannelListener<StreamSinkChannel> {
private final HttpServerExchange exchange;
@ -343,8 +348,10 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
}
@Override
public void onNext(ByteBuffer buffer) {
super.onNext(buffer);
public void onNext(DataBuffer dataBuffer) {
super.onNext(dataBuffer);
ByteBuffer buffer = dataBuffer.asByteBuffer();
if (this.responseChannel == null) {
this.responseChannel = exchange.getResponseChannel();
@ -407,7 +414,7 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
} while (buffer.hasRemaining() && c > 0);
if (!buffer.hasRemaining()) {
safeClose(this.buffers.remove());
IoUtils.safeClose(this.buffers.remove());
}
} while (!this.buffers.isEmpty() && c > 0);
@ -461,8 +468,10 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
this.responseChannel.shutdownWrites();
if (!this.responseChannel.flush()) {
this.responseChannel.getWriteSetter().set(flushingChannelListener(
o -> safeClose(this.responseChannel), closingChannelExceptionHandler()));
this.responseChannel.getWriteSetter().set(ChannelListeners
.flushingChannelListener(
o -> IoUtils.safeClose(this.responseChannel),
ChannelListeners.closingChannelExceptionHandler()));
this.responseChannel.resumeWrites();
}
this.responseChannel = null;

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,7 +18,6 @@ package org.springframework.http.server.reactive;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -29,6 +28,7 @@ import io.undertow.util.HeaderValues;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
@ -44,10 +44,10 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest {
private final HttpServerExchange exchange;
private final Flux<ByteBuffer> body;
private final Flux<DataBuffer> body;
public UndertowServerHttpRequest(HttpServerExchange exchange, Publisher<ByteBuffer> body) {
public UndertowServerHttpRequest(HttpServerExchange exchange,
Publisher<DataBuffer> body) {
Assert.notNull(exchange, "'exchange' is required.");
Assert.notNull(exchange, "'body' is required.");
this.exchange = exchange;
@ -92,7 +92,7 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest {
}
@Override
public Flux<ByteBuffer> getBody() {
public Flux<DataBuffer> getBody() {
return this.body;
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,7 +16,6 @@
package org.springframework.http.server.reactive;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@ -28,6 +27,7 @@ import io.undertow.util.HttpString;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpStatus;
import org.springframework.util.Assert;
@ -42,11 +42,11 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse {
private final HttpServerExchange exchange;
private final Function<Publisher<ByteBuffer>, Mono<Void>> responseBodyWriter;
private final Function<Publisher<DataBuffer>, Mono<Void>> responseBodyWriter;
public UndertowServerHttpResponse(HttpServerExchange exchange,
Function<Publisher<ByteBuffer>, Mono<Void>> responseBodyWriter) {
Function<Publisher<DataBuffer>, Mono<Void>> responseBodyWriter) {
Assert.notNull(exchange, "'exchange' is required.");
Assert.notNull(responseBodyWriter, "'responseBodyWriter' must not be null");
@ -66,7 +66,7 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse {
}
@Override
protected Mono<Void> setBodyInternal(Publisher<ByteBuffer> publisher) {
protected Mono<Void> setBodyInternal(Publisher<DataBuffer> publisher) {
return this.responseBodyWriter.apply(publisher);
}

View File

@ -1,11 +1,11 @@
/*
* Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -21,7 +21,8 @@ import reactor.core.state.Completable;
import reactor.io.buffer.Buffer;
import reactor.io.net.ReactiveNet;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.util.Assert;
@ -29,29 +30,35 @@ import org.springframework.util.Assert;
* @author Stephane Maldini
*/
public class ReactorHttpServer extends HttpServerSupport
implements InitializingBean, HttpServer, Connectable, Completable {
implements HttpServer, Connectable, Completable {
private ReactorHttpHandlerAdapter reactorHandler;
private reactor.io.net.http.HttpServer<Buffer, Buffer> reactorServer;
private DataBufferAllocator allocator = new DefaultDataBufferAllocator();
private boolean running;
@Override
public boolean isRunning() {
return this.running;
public void setAllocator(DataBufferAllocator allocator) {
this.allocator = allocator;
}
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(getHttpHandler());
this.reactorHandler = new ReactorHttpHandlerAdapter(getHttpHandler());
this.reactorHandler = new ReactorHttpHandlerAdapter(getHttpHandler(), allocator);
this.reactorServer = (getPort() != -1 ? ReactiveNet.httpServer(getPort()) :
ReactiveNet.httpServer());
}
@Override
public boolean isRunning() {
return this.running;
}
@Override
public Object connectedInput() {
return reactorServer;

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,8 +17,10 @@
package org.springframework.http.server.reactive.boot;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
import org.springframework.http.server.reactive.RxNettyHttpHandlerAdapter;
import org.springframework.util.Assert;
@ -26,14 +28,34 @@ import org.springframework.util.Assert;
/**
* @author Rossen Stoyanchev
*/
public class RxNettyHttpServer extends HttpServerSupport implements InitializingBean, HttpServer {
public class RxNettyHttpServer extends HttpServerSupport implements HttpServer {
private RxNettyHttpHandlerAdapter rxNettyHandler;
private io.reactivex.netty.protocol.http.server.HttpServer<ByteBuf, ByteBuf> rxNettyServer;
private NettyDataBufferAllocator allocator;
private boolean running;
public void setAllocator(ByteBufAllocator allocator) {
Assert.notNull(allocator, "'allocator' must not be null");
this.allocator = new NettyDataBufferAllocator(allocator);
}
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(getHttpHandler());
if (allocator == null) {
allocator = new NettyDataBufferAllocator(UnpooledByteBufAllocator.DEFAULT);
}
this.rxNettyHandler = new RxNettyHttpHandlerAdapter(getHttpHandler(), allocator);
this.rxNettyServer = (getPort() != -1 ?
io.reactivex.netty.protocol.http.server.HttpServer.newServer(getPort()) :
io.reactivex.netty.protocol.http.server.HttpServer.newServer());
}
@Override
public boolean isRunning() {
@ -41,18 +63,6 @@ public class RxNettyHttpServer extends HttpServerSupport implements Initializing
}
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(getHttpHandler());
this.rxNettyHandler = new RxNettyHttpHandlerAdapter(getHttpHandler());
this.rxNettyServer = (getPort() != -1 ?
io.reactivex.netty.protocol.http.server.HttpServer.newServer(getPort()) :
io.reactivex.netty.protocol.http.server.HttpServer.newServer());
}
@Override
public void start() {
if (!this.running) {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,24 +19,30 @@ package org.springframework.http.server.reactive.boot;
import io.undertow.Undertow;
import io.undertow.server.HttpHandler;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
import org.springframework.http.server.reactive.UndertowHttpHandlerAdapter;
import org.springframework.util.Assert;
/**
* @author Marek Hawrylczak
*/
public class UndertowHttpServer extends HttpServerSupport implements InitializingBean, HttpServer {
public class UndertowHttpServer extends HttpServerSupport implements HttpServer {
private Undertow server;
private DataBufferAllocator allocator = new DefaultDataBufferAllocator();
private boolean running;
public void setAllocator(DataBufferAllocator allocator) {
this.allocator = allocator;
}
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(getHttpHandler());
HttpHandler handler = new UndertowHttpHandlerAdapter(getHttpHandler());
HttpHandler handler = new UndertowHttpHandlerAdapter(getHttpHandler(), allocator);
int port = (getPort() != -1 ? getPort() : 8080);
this.server = Undertow.builder().addHttpListener(port, "localhost")
.setHandler(handler).build();

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,7 +16,6 @@
package org.springframework.web.reactive.method.annotation;
import java.nio.ByteBuffer;
import java.util.List;
import org.reactivestreams.Publisher;
@ -27,6 +26,7 @@ import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Decoder;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.MediaType;
import org.springframework.util.Assert;
import org.springframework.web.bind.annotation.RequestBody;
@ -64,7 +64,7 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve
mediaType = MediaType.APPLICATION_OCTET_STREAM;
}
ResolvableType type = ResolvableType.forMethodParameter(parameter);
Flux<ByteBuffer> body = exchange.getRequest().getBody();
Flux<DataBuffer> body = exchange.getRequest().getBody();
Flux<?> elementFlux = body;
ResolvableType elementType = type.hasGenerics() ? type.getGeneric(0) : type;

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;
@ -35,6 +36,8 @@ import org.springframework.core.codec.support.JsonObjectDecoder;
import org.springframework.core.codec.support.StringDecoder;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
import org.springframework.util.ObjectUtils;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.method.annotation.ExceptionHandlerMethodResolver;
@ -57,6 +60,9 @@ public class RequestMappingHandlerAdapter implements HandlerAdapter, Initializin
private ConversionService conversionService = new DefaultConversionService();
private DataBufferAllocator allocator =
new NettyDataBufferAllocator(new UnpooledByteBufAllocator(false));
private final Map<Class<?>, ExceptionHandlerMethodResolver> exceptionHandlerCache =
new ConcurrentHashMap<Class<?>, ExceptionHandlerMethodResolver>(64);
@ -85,13 +91,17 @@ public class RequestMappingHandlerAdapter implements HandlerAdapter, Initializin
return this.conversionService;
}
public void setAllocator(DataBufferAllocator allocator) {
this.allocator = allocator;
}
@Override
public void afterPropertiesSet() throws Exception {
if (ObjectUtils.isEmpty(this.argumentResolvers)) {
List<Decoder<?>> decoders = Arrays.asList(new ByteBufferDecoder(),
new StringDecoder(), new JacksonJsonDecoder(new JsonObjectDecoder()));
new StringDecoder(allocator),
new JacksonJsonDecoder(new JsonObjectDecoder(allocator)));
this.argumentResolvers.add(new RequestParamArgumentResolver());
this.argumentResolvers.add(new RequestBodyArgumentResolver(decoders, this.conversionService));

View File

@ -1,11 +1,11 @@
/*
* Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -31,6 +31,8 @@ import reactor.core.timer.Timers;
import reactor.io.buffer.Buffer;
import reactor.rx.Stream;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.http.server.reactive.boot.HttpServer;
@ -52,6 +54,8 @@ public class AsyncIntegrationTests {
private final ProcessorGroup asyncGroup = Processors.asyncGroup();
private final DataBufferAllocator allocator = new DefaultDataBufferAllocator();
protected int port;
@Parameterized.Parameter(0)
@ -109,7 +113,7 @@ public class AsyncIntegrationTests {
.dispatchOn(asyncGroup)
.collect(Buffer::new, Buffer::append)
.doOnSuccess(Buffer::flip)
.map(Buffer::byteBuffer)
.map((bytes) -> allocator.wrap(bytes.byteBuffer()))
);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,11 +16,11 @@
package org.springframework.http.server.reactive;
import java.net.URI;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
@ -35,7 +35,7 @@ public class MockServerHttpRequest implements ServerHttpRequest {
private HttpHeaders headers = new HttpHeaders();
private Flux<ByteBuffer> body;
private Flux<DataBuffer> body;
public MockServerHttpRequest(HttpMethod httpMethod, URI uri) {
@ -43,7 +43,8 @@ public class MockServerHttpRequest implements ServerHttpRequest {
this.uri = uri;
}
public MockServerHttpRequest(Publisher<ByteBuffer> body, HttpMethod httpMethod, URI uri) {
public MockServerHttpRequest(Publisher<DataBuffer> body, HttpMethod httpMethod,
URI uri) {
this.body = Flux.from(body);
this.httpMethod = httpMethod;
this.uri = uri;
@ -78,11 +79,11 @@ public class MockServerHttpRequest implements ServerHttpRequest {
}
@Override
public Flux<ByteBuffer> getBody() {
public Flux<DataBuffer> getBody() {
return this.body;
}
public void setBody(Publisher<ByteBuffer> body) {
public void setBody(Publisher<DataBuffer> body) {
this.body = Flux.from(body);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -15,12 +15,12 @@
*/
package org.springframework.http.server.reactive;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
@ -33,7 +33,7 @@ public class MockServerHttpResponse implements ServerHttpResponse {
private HttpHeaders headers = new HttpHeaders();
private Publisher<ByteBuffer> body;
private Publisher<DataBuffer> body;
@Override
@ -51,12 +51,12 @@ public class MockServerHttpResponse implements ServerHttpResponse {
}
@Override
public Mono<Void> setBody(Publisher<ByteBuffer> body) {
public Mono<Void> setBody(Publisher<DataBuffer> body) {
this.body = body;
return Flux.from(body).after();
}
public Publisher<ByteBuffer> getBody() {
public Publisher<DataBuffer> getBody() {
return this.body;
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,7 +16,6 @@
package org.springframework.http.server.reactive;
import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.commons.logging.Log;
@ -24,7 +23,9 @@ import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Mono;
import reactor.io.buffer.Buffer;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
import static org.junit.Assert.assertEquals;
@ -42,7 +43,7 @@ public class RandomHandler implements HttpHandler {
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
request.getBody().subscribe(new Subscriber<ByteBuffer>() {
request.getBody().subscribe(new Subscriber<DataBuffer>() {
private Subscription s;
private int requestSize = 0;
@ -54,8 +55,8 @@ public class RandomHandler implements HttpHandler {
}
@Override
public void onNext(ByteBuffer bytes) {
requestSize += new Buffer(bytes).limit();
public void onNext(DataBuffer bytes) {
requestSize += bytes.readableByteCount();
}
@Override
@ -72,7 +73,11 @@ public class RandomHandler implements HttpHandler {
});
response.getHeaders().setContentLength(RESPONSE_SIZE);
return response.setBody(Mono.just(ByteBuffer.wrap(randomBytes())));
byte[] randomBytes = randomBytes();
DataBuffer buffer =
new DefaultDataBufferAllocator().allocateBuffer(randomBytes.length);
buffer.write(randomBytes);
return response.setBody(Mono.just(buffer));
}
private byte[] randomBytes() {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,6 +16,8 @@
package org.springframework.http.server.reactive;
import java.io.InputStream;
import java.io.OutputStream;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Marshaller;
import javax.xml.bind.Unmarshaller;
@ -26,9 +28,10 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.io.buffer.Buffer;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
import org.springframework.core.io.buffer.support.DataBufferUtils;
import org.springframework.http.MediaType;
import org.springframework.util.BufferOutputStream;
import org.springframework.util.ByteBufferPublisherInputStream;
import static org.junit.Assert.fail;
@ -49,7 +52,7 @@ public class XmlHandler implements HttpHandler {
Runnable r = () -> {
try {
ByteBufferPublisherInputStream bis = new ByteBufferPublisherInputStream(request.getBody());
InputStream bis = DataBufferUtils.toInputStream(request.getBody());
XmlHandlerIntegrationTests.Person johnDoe =
(XmlHandlerIntegrationTests.Person) unmarshaller.unmarshal(bis);
@ -67,13 +70,13 @@ public class XmlHandler implements HttpHandler {
response.getHeaders().setContentType(MediaType.APPLICATION_XML);
XmlHandlerIntegrationTests.Person janeDoe = new XmlHandlerIntegrationTests.Person("Jane Doe");
Buffer buffer = new Buffer();
BufferOutputStream bos = new BufferOutputStream(buffer);
DataBuffer buffer = new DefaultDataBufferAllocator().allocateBuffer();
OutputStream bos = buffer.asOutputStream();
marshaller.marshal(janeDoe, bos);
bos.close();
buffer.flip();
return response.setBody(Flux.just(buffer.byteBuffer()));
return response.setBody(Flux.just(buffer));
}
catch (Exception ex) {
logger.error(ex, ex);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -13,10 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
@ -33,6 +33,8 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.support.StringEncoder;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
@ -109,7 +111,7 @@ public class DispatcherHandlerErrorTests {
@Test
public void noResolverForArgument() throws Exception {
this.request.setUri(new URI("/uknown-argument-type"));
this.request.setUri(new URI("/unknown-argument-type"));
Publisher<Void> publisher = this.dispatcherHandler.handle(this.exchange);
Throwable ex = awaitErrorSignal(publisher);
@ -153,7 +155,9 @@ public class DispatcherHandlerErrorTests {
public void notAcceptable() throws Exception {
this.request.setUri(new URI("/request-body"));
this.request.getHeaders().setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
this.request.setBody(Mono.just(ByteBuffer.wrap("body".getBytes("UTF-8"))));
DataBuffer buffer = new DefaultDataBufferAllocator().allocateBuffer()
.write("body".getBytes("UTF-8"));
this.request.setBody(Mono.just(buffer));
Publisher<Void> publisher = this.dispatcherHandler.handle(this.exchange);
Throwable ex = awaitErrorSignal(publisher);
@ -178,7 +182,7 @@ public class DispatcherHandlerErrorTests {
@Test
public void dispatcherHandlerWithHttpExceptionHandler() throws Exception {
this.request.setUri(new URI("/uknown-argument-type"));
this.request.setUri(new URI("/unknown-argument-type"));
WebExceptionHandler exceptionHandler = new ServerError500ExceptionHandler();
WebHandler webHandler = new ExceptionHandlingWebHandler(this.dispatcherHandler, exceptionHandler);
@ -190,7 +194,7 @@ public class DispatcherHandlerErrorTests {
@Test
public void filterChainWithHttpExceptionHandler() throws Exception {
this.request.setUri(new URI("/uknown-argument-type"));
this.request.setUri(new URI("/unknown-argument-type"));
WebHandler webHandler = new FilteringWebHandler(this.dispatcherHandler, new TestWebFilter());
webHandler = new ExceptionHandlingWebHandler(webHandler, new ServerError500ExceptionHandler());
@ -224,7 +228,8 @@ public class DispatcherHandlerErrorTests {
@Bean
public ResponseBodyResultHandler resultHandler() {
List<Encoder<?>> encoders = Collections.singletonList(new StringEncoder());
List<Encoder<?>> encoders = Collections
.singletonList(new StringEncoder(new DefaultDataBufferAllocator()));
return new ResponseBodyResultHandler(encoders, new DefaultConversionService());
}
@ -238,8 +243,8 @@ public class DispatcherHandlerErrorTests {
@SuppressWarnings("unused")
private static class TestController {
@RequestMapping("/uknown-argument-type")
public void uknownArgumentType(Foo arg) {
@RequestMapping("/unknown-argument-type")
public void unknownArgumentType(Foo arg) {
}
@RequestMapping("/error-signal")

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,6 +18,7 @@ package org.springframework.web.reactive.handler;
import java.net.URI;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
@ -27,6 +28,8 @@ import reactor.core.publisher.Mono;
import reactor.io.buffer.Buffer;
import org.springframework.context.support.StaticApplicationContext;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
import org.springframework.http.HttpStatus;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
@ -139,7 +142,9 @@ public class SimpleUrlHandlerMappingIntegrationTests extends AbstractHttpHandler
@Override
public Mono<Void> handle(WebServerExchange exchange) {
return exchange.getResponse().setBody(Flux.just(Buffer.wrap("foo").byteBuffer()));
DataBuffer buffer = new DefaultDataBufferAllocator().allocateBuffer()
.write("foo".getBytes(StandardCharsets.UTF_8));
return exchange.getResponse().setBody(Flux.just(buffer));
}
}
@ -147,7 +152,9 @@ public class SimpleUrlHandlerMappingIntegrationTests extends AbstractHttpHandler
@Override
public Mono<Void> handle(WebServerExchange exchange) {
return exchange.getResponse().setBody(Flux.just(Buffer.wrap("bar").byteBuffer()));
DataBuffer buffer = new DefaultDataBufferAllocator().allocateBuffer()
.write("bar".getBytes(StandardCharsets.UTF_8));
return exchange.getResponse().setBody(Flux.just(buffer));
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -48,6 +48,9 @@ import org.springframework.core.convert.support.GenericConversionService;
import org.springframework.core.convert.support.ReactiveStreamsToCompletableFutureConverter;
import org.springframework.core.convert.support.ReactiveStreamsToReactorStreamConverter;
import org.springframework.core.convert.support.ReactiveStreamsToRxJava1Converter;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.RequestEntity;
@ -380,8 +383,10 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
@Bean
public ResponseBodyResultHandler responseBodyResultHandler() {
DataBufferAllocator allocator = new DefaultDataBufferAllocator();
return new ResponseBodyResultHandler(Arrays.asList(
new ByteBufferEncoder(), new StringEncoder(), new JacksonJsonEncoder(new JsonObjectEncoder())),
new ByteBufferEncoder(allocator), new StringEncoder(allocator),
new JacksonJsonEncoder(allocator, new JsonObjectEncoder(allocator))),
conversionService());
}
@ -426,9 +431,9 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
@RequestMapping("/raw")
public Publisher<ByteBuffer> rawResponseBody() {
JacksonJsonEncoder encoder = new JacksonJsonEncoder();
JacksonJsonEncoder encoder = new JacksonJsonEncoder(new DefaultDataBufferAllocator());
return encoder.encode(Stream.just(new Person("Robert")),
ResolvableType.forClass(Person.class), MediaType.APPLICATION_JSON);
ResolvableType.forClass(Person.class), MediaType.APPLICATION_JSON).map(DataBuffer::asByteBuffer);
}
@RequestMapping("/stream-result")

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -22,11 +22,12 @@ import org.junit.Test;
import org.reactivestreams.Publisher;
import org.springframework.core.ResolvableType;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.core.codec.support.StringEncoder;
import org.springframework.web.reactive.HandlerResult;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.reactive.HandlerResult;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -40,7 +41,8 @@ public class ResponseBodyResultHandlerTests {
@Test
public void supports() throws NoSuchMethodException {
ResponseBodyResultHandler handler = new ResponseBodyResultHandler(Collections.singletonList(
new StringEncoder()), new DefaultConversionService());
new StringEncoder(new DefaultDataBufferAllocator())),
new DefaultConversionService());
TestController controller = new TestController();
HandlerMethod hm = new HandlerMethod(controller,TestController.class.getMethod("notAnnotated"));