Use ByteBuffer instead of byte[]

This commit is contained in:
Sebastien Deleuze 2015-09-18 16:39:23 -04:00
parent 9cbe984598
commit 110d9d7cd9
17 changed files with 65 additions and 224 deletions

View File

@ -42,7 +42,7 @@ import org.springframework.core.ResolvableType;
import org.springframework.http.MediaType;
import org.springframework.reactive.codec.CodecException;
import org.springframework.reactive.codec.encoder.Jaxb2Encoder;
import org.springframework.reactive.io.ByteArrayPublisherInputStream;
import org.springframework.reactive.io.ByteBufferPublisherInputStream;
import org.springframework.util.Assert;
/**
@ -63,10 +63,9 @@ public class Jaxb2Decoder implements ByteToMessageDecoder<Object> {
@Override
public Publisher<Object> decode(Publisher<ByteBuffer> inputStream, ResolvableType type, MediaType mediaType, Object... hints) {
Stream<byte[]> stream = Streams.wrap(inputStream).map(chunk -> new Buffer(chunk).asBytes());
Class<?> outputClass = type.getRawClass();
try {
Source source = processSource(new StreamSource(new ByteArrayPublisherInputStream(stream)));
Source source = processSource(new StreamSource(new ByteBufferPublisherInputStream(inputStream)));
Unmarshaller unmarshaller = createUnmarshaller(outputClass);
if (outputClass.isAnnotationPresent(XmlRootElement.class)) {
return Streams.just(unmarshaller.unmarshal(source));

View File

@ -1,57 +0,0 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.io;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import org.reactivestreams.Publisher;
import reactor.rx.Streams;
/**
* {@code OutputStream} implementation that stores all written bytes, to be retrieved
* using {@link #toByteArrayPublisher()}.
* @author Arjen Poutsma
*/
public class ByteArrayPublisherOutputStream extends OutputStream {
private final List<byte[]> buffers = new ArrayList<>();
/**
* Returns the written data as a {@code Publisher}.
* @return a publisher for the written bytes
*/
public Publisher<byte[]> toByteArrayPublisher() {
return Streams.from(buffers);
}
@Override
public void write(int b) throws IOException {
write(new byte[]{(byte) b});
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
byte[] copy = new byte[len - off];
System.arraycopy(b, off, copy, 0, len);
buffers.add(copy);
}
}

View File

@ -16,9 +16,9 @@
package org.springframework.reactive.io;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@ -32,13 +32,14 @@ import org.springframework.util.Assert;
/**
* {@code InputStream} implementation based on a byte array {@link Publisher}.
* @author Arjen Poutsma
* @author Sebastien Deleuze
*/
public class ByteArrayPublisherInputStream extends InputStream {
public class ByteBufferPublisherInputStream extends InputStream {
private final BlockingQueue<PublisherSignal<byte[]>> queue =
private final BlockingQueue<PublisherSignal<ByteBuffer>> queue =
new LinkedBlockingQueue<>();
private ByteArrayInputStream currentStream;
private ByteBufferInputStream currentStream;
private boolean completed;
@ -47,7 +48,7 @@ public class ByteArrayPublisherInputStream extends InputStream {
* Creates a new {@code ByteArrayPublisherInputStream} based on the given publisher.
* @param publisher the publisher to use
*/
public ByteArrayPublisherInputStream(Publisher<byte[]> publisher) {
public ByteBufferPublisherInputStream(Publisher<ByteBuffer> publisher) {
this(publisher, 1);
}
@ -57,7 +58,7 @@ public class ByteArrayPublisherInputStream extends InputStream {
* @param requestSize the {@linkplain Subscription#request(long) request size} to use
* on the publisher
*/
public ByteArrayPublisherInputStream(Publisher<byte[]> publisher, long requestSize) {
public ByteBufferPublisherInputStream(Publisher<ByteBuffer> publisher, long requestSize) {
Assert.notNull(publisher, "'publisher' must not be null");
publisher.subscribe(new BlockingQueueSubscriber(requestSize));
@ -130,11 +131,11 @@ public class ByteArrayPublisherInputStream extends InputStream {
}
else {
// take() blocks, but that's OK since this is a *blocking* InputStream
PublisherSignal<byte[]> signal = this.queue.take();
PublisherSignal<ByteBuffer> signal = this.queue.take();
if (signal.isData()) {
byte[] data = signal.data();
this.currentStream = new ByteArrayInputStream(data);
ByteBuffer data = signal.data();
this.currentStream = new ByteBufferInputStream(data);
return this.currentStream;
}
else if (signal.isComplete()) {
@ -159,7 +160,7 @@ public class ByteArrayPublisherInputStream extends InputStream {
throw new IOException();
}
private class BlockingQueueSubscriber implements Subscriber<byte[]> {
private class BlockingQueueSubscriber implements Subscriber<ByteBuffer> {
private final long requestSize;
@ -177,7 +178,7 @@ public class ByteArrayPublisherInputStream extends InputStream {
}
@Override
public void onNext(byte[] bytes) {
public void onNext(ByteBuffer bytes) {
try {
queue.put(PublisherSignal.data(bytes));
this.subscription.request(requestSize);

View File

@ -78,7 +78,7 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve
ByteToMessageDecoder<?> deserializer = resolveDeserializers(request, type, mediaType, hints.toArray());
if (deserializer != null) {
Publisher<ByteBuffer> inputStream = Streams.wrap(request.getBody()).map(bytes -> ByteBuffer.wrap(bytes));
Publisher<ByteBuffer> inputStream = request.getBody();
List<ByteToMessageDecoder<ByteBuffer>> preProcessors = resolvePreProcessors(request, type, mediaType, hints.toArray());
for (ByteToMessageDecoder<ByteBuffer> preProcessor : preProcessors) {
inputStream = preProcessor.decode(inputStream, type, mediaType, hints.toArray());

View File

@ -23,7 +23,6 @@ import java.util.Collections;
import java.util.List;
import org.reactivestreams.Publisher;
import reactor.io.buffer.Buffer;
import reactor.rx.Streams;
import rx.Observable;
import rx.RxReactiveStreams;
@ -125,7 +124,7 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered
outputStream = postProcessor.encode(outputStream, type, mediaType, hints.toArray());
}
response.getHeaders().setContentType(mediaType);
return response.writeWith(Streams.wrap(outputStream).map(buffer -> new Buffer(buffer).asBytes()));
return response.writeWith(Streams.wrap(outputStream));
}
return Streams.fail(new IllegalStateException(
"Return value type not supported: " + returnType));

View File

@ -15,6 +15,8 @@
*/
package org.springframework.reactive.web.http;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
/**
@ -23,6 +25,6 @@ import org.reactivestreams.Publisher;
*/
public interface ServerHttpRequest extends HttpRequest {
Publisher<byte[]> getBody();
Publisher<ByteBuffer> getBody();
}

View File

@ -15,6 +15,8 @@
*/
package org.springframework.reactive.web.http;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpStatus;
@ -26,6 +28,6 @@ public interface ServerHttpResponse extends HttpMessage {
void setStatusCode(HttpStatus status);
Publisher<Void> writeWith(Publisher<byte[]> contentPublisher);
Publisher<Void> writeWith(Publisher<ByteBuffer> contentPublisher);
}

View File

@ -17,6 +17,7 @@ package org.springframework.reactive.web.http.rxnetty;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
@ -74,12 +75,8 @@ public class RxNettyServerHttpRequest implements ServerHttpRequest {
}
@Override
public Publisher<byte[]> getBody() {
Observable<byte[]> bytesContent = this.request.getContent().map(byteBuf -> {
byte[] copy = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(copy);
return copy;
});
public Publisher<ByteBuffer> getBody() {
Observable<ByteBuffer> bytesContent = this.request.getContent().map(byteBuf -> byteBuf.nioBuffer());
return rx.RxReactiveStreams.toPublisher(bytesContent);
}

View File

@ -15,9 +15,12 @@
*/
package org.springframework.reactive.web.http.rxnetty;
import java.nio.ByteBuffer;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import org.reactivestreams.Publisher;
import reactor.io.buffer.Buffer;
import rx.Observable;
import rx.RxReactiveStreams;
@ -56,9 +59,9 @@ public class RxNettyServerHttpResponse implements ServerHttpResponse {
}
@Override
public Publisher<Void> writeWith(Publisher<byte[]> contentPublisher) {
public Publisher<Void> writeWith(Publisher<ByteBuffer> contentPublisher) {
writeHeaders();
Observable<byte[]> contentObservable = RxReactiveStreams.toObservable(contentPublisher);
Observable<byte[]> contentObservable = RxReactiveStreams.toObservable(contentPublisher).map(content -> new Buffer(content).asBytes());
return RxReactiveStreams.toPublisher(this.response.writeBytes(contentObservable));
}

View File

@ -17,6 +17,7 @@
package org.springframework.reactive.web.http.servlet;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
import javax.servlet.ReadListener;
@ -33,7 +34,7 @@ import org.springframework.reactive.util.DemandCounter;
/**
* @author Arjen Poutsma
*/
public class RequestBodyPublisher implements ReadListener, Publisher<byte[]> {
public class RequestBodyPublisher implements ReadListener, Publisher<ByteBuffer> {
private final Charset UTF_8 = Charset.forName("UTF-8");
@ -45,7 +46,7 @@ public class RequestBodyPublisher implements ReadListener, Publisher<byte[]> {
private final DemandCounter demand = new DemandCounter();
private Subscriber<? super byte[]> subscriber;
private Subscriber<? super ByteBuffer> subscriber;
private boolean stalled;
@ -57,7 +58,7 @@ public class RequestBodyPublisher implements ReadListener, Publisher<byte[]> {
}
@Override
public void subscribe(Subscriber<? super byte[]> subscriber) {
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
if (subscriber == null) {
throw new NullPointerException();
}
@ -103,7 +104,7 @@ public class RequestBodyPublisher implements ReadListener, Publisher<byte[]> {
// logger.debug("Next: " + new String(copy, UTF_8));
this.subscriber.onNext(copy);
this.subscriber.onNext(ByteBuffer.wrap(copy));
}
}

View File

@ -17,6 +17,7 @@
package org.springframework.reactive.web.http.servlet;
import java.io.IOException;
import java.nio.ByteBuffer;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
@ -24,13 +25,14 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.io.buffer.Buffer;
import org.springframework.util.Assert;
/**
* @author Arjen Poutsma
*/
public class ResponseBodySubscriber implements WriteListener, Subscriber<byte[]> {
public class ResponseBodySubscriber implements WriteListener, Subscriber<ByteBuffer> {
private static final Log logger = LogFactory.getLog(ResponseBodySubscriber.class);
@ -38,7 +40,7 @@ public class ResponseBodySubscriber implements WriteListener, Subscriber<byte[]>
private Subscription subscription;
private byte[] buffer;
private ByteBuffer buffer;
private volatile boolean subscriberComplete = false;
@ -53,8 +55,7 @@ public class ResponseBodySubscriber implements WriteListener, Subscriber<byte[]>
}
@Override
public void onNext(byte[] bytes) {
logger.debug("Next: " + bytes.length + " bytes");
public void onNext(ByteBuffer bytes) {
Assert.isNull(buffer);
@ -87,7 +88,7 @@ public class ResponseBodySubscriber implements WriteListener, Subscriber<byte[]>
if (ready) {
if (this.buffer != null) {
output.write(this.buffer);
output.write(new Buffer(this.buffer).asBytes());
this.buffer = null;
if (!subscriberComplete) {

View File

@ -17,6 +17,7 @@ package org.springframework.reactive.web.http.servlet;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Enumeration;
import java.util.Map;
@ -39,12 +40,12 @@ public class ServletServerHttpRequest implements ServerHttpRequest {
private final HttpServletRequest servletRequest;
private final Publisher<byte[]> requestBodyPublisher;
private final Publisher<ByteBuffer> requestBodyPublisher;
private HttpHeaders headers;
public ServletServerHttpRequest(HttpServletRequest servletRequest, Publisher<byte[]> requestBodyPublisher) {
public ServletServerHttpRequest(HttpServletRequest servletRequest, Publisher<ByteBuffer> requestBodyPublisher) {
Assert.notNull(servletRequest, "HttpServletRequest must not be null");
this.servletRequest = servletRequest;
this.requestBodyPublisher = requestBodyPublisher;
@ -111,7 +112,7 @@ public class ServletServerHttpRequest implements ServerHttpRequest {
}
@Override
public Publisher<byte[]> getBody() {
public Publisher<ByteBuffer> getBody() {
return this.requestBodyPublisher;
}

View File

@ -15,6 +15,7 @@
*/
package org.springframework.reactive.web.http.servlet;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletResponse;
@ -60,7 +61,7 @@ public class ServletServerHttpResponse implements ServerHttpResponse {
}
@Override
public Publisher<Void> writeWith(final Publisher<byte[]> contentPublisher) {
public Publisher<Void> writeWith(final Publisher<ByteBuffer> contentPublisher) {
writeHeaders();
return (s -> contentPublisher.subscribe(responseSubscriber));
}

View File

@ -1,115 +0,0 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.io;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.rx.Stream;
import reactor.rx.Streams;
import org.springframework.util.FileCopyUtils;
import static org.junit.Assert.*;
/**
* @author Arjen Poutsma
*/
public class ByteArrayPublisherInputStreamTests {
private ByteArrayPublisherInputStream is;
@Before
public void createStream() {
Stream<byte[]> stream =
Streams.just(new byte[]{'a', 'b', 'c'}, new byte[]{'d', 'e'});
is = new ByteArrayPublisherInputStream(stream);
}
@Test
public void reactor() throws Exception {
assertEquals(3, is.available());
int ch = is.read();
assertEquals('a', ch);
ch = is.read();
assertEquals('b', ch);
ch = is.read();
assertEquals('c', ch);
assertEquals(2, is.available());
ch = is.read();
assertEquals('d', ch);
ch = is.read();
assertEquals('e', ch);
ch = is.read();
assertEquals(-1, ch);
assertEquals(0, is.available());
}
@Test
public void copy() throws Exception {
ByteArrayPublisherOutputStream os = new ByteArrayPublisherOutputStream();
FileCopyUtils.copy(is, os);
Publisher<byte[]> publisher = os.toByteArrayPublisher();
List<byte[]> result = new ArrayList<>();
AtomicBoolean complete = new AtomicBoolean();
publisher.subscribe(new Subscriber<byte[]>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(byte[] bytes) {
result.add(bytes);
}
@Override
public void onError(Throwable t) {
fail(t.getMessage());
}
@Override
public void onComplete() {
complete.set(true);
}
});
while (!complete.get()) {
}
assertArrayEquals(result.get(0), new byte[]{'a', 'b', 'c'});
assertArrayEquals(result.get(1), new byte[]{'d', 'e'});
}
}

View File

@ -22,6 +22,7 @@ import java.util.Map;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.io.buffer.Buffer;
import reactor.rx.Streams;
import org.springframework.http.RequestEntity;
@ -97,7 +98,7 @@ public class SimpleUrlHandlerMappingIntegrationTests extends AbstractHttpHandler
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
return response.writeWith(Streams.just("foo".getBytes(UTF_8)));
return response.writeWith(Streams.just(Buffer.wrap("foo").byteBuffer()));
}
}
@ -105,7 +106,7 @@ public class SimpleUrlHandlerMappingIntegrationTests extends AbstractHttpHandler
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
return response.writeWith(Streams.just("bar".getBytes(UTF_8)));
return response.writeWith(Streams.just(Buffer.wrap("bar").byteBuffer()));
}
}

View File

@ -16,6 +16,7 @@
package org.springframework.reactive.web.http;
import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.commons.logging.Log;
@ -23,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.io.buffer.Buffer;
import reactor.rx.Streams;
import static org.junit.Assert.assertEquals;
@ -41,7 +43,7 @@ public class RandomHandler implements HttpHandler {
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
request.getBody().subscribe(new Subscriber<byte[]>() {
request.getBody().subscribe(new Subscriber<ByteBuffer>() {
private Subscription s;
private int requestSize = 0;
@ -53,8 +55,8 @@ public class RandomHandler implements HttpHandler {
}
@Override
public void onNext(byte[] bytes) {
requestSize += bytes.length;
public void onNext(ByteBuffer bytes) {
requestSize += new Buffer(bytes).limit();
}
@Override
@ -71,7 +73,7 @@ public class RandomHandler implements HttpHandler {
});
response.getHeaders().setContentLength(RESPONSE_SIZE);
return response.writeWith(Streams.<byte[]>just(randomBytes()));
return response.writeWith(Streams.just(ByteBuffer.wrap(randomBytes())));
}
private byte[] randomBytes() {

View File

@ -23,13 +23,14 @@ import javax.xml.bind.Unmarshaller;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.rx.Streams;
import org.springframework.http.MediaType;
import org.springframework.reactive.io.ByteArrayPublisherInputStream;
import org.springframework.reactive.io.ByteArrayPublisherOutputStream;
import org.springframework.reactive.io.BufferOutputStream;
import org.springframework.reactive.io.ByteBufferPublisherInputStream;
import static org.junit.Assert.fail;
import reactor.io.buffer.Buffer;
import reactor.rx.Streams;
/**
* @author Arjen Poutsma
@ -48,7 +49,7 @@ public class XmlHandler implements HttpHandler {
Runnable r = () -> {
try {
ByteArrayPublisherInputStream bis = new ByteArrayPublisherInputStream(request.getBody());
ByteBufferPublisherInputStream bis = new ByteBufferPublisherInputStream(request.getBody());
XmlHandlerIntegrationTests.Person johnDoe =
(XmlHandlerIntegrationTests.Person) unmarshaller.unmarshal(bis);
@ -66,11 +67,13 @@ public class XmlHandler implements HttpHandler {
response.getHeaders().setContentType(MediaType.APPLICATION_XML);
XmlHandlerIntegrationTests.Person janeDoe = new XmlHandlerIntegrationTests.Person("Jane Doe");
ByteArrayPublisherOutputStream bos = new ByteArrayPublisherOutputStream();
Buffer buffer = new Buffer();
BufferOutputStream bos = new BufferOutputStream(buffer);
marshaller.marshal(janeDoe, bos);
bos.close();
buffer.flip();
return response.writeWith(bos.toByteArrayPublisher());
return response.writeWith(Streams.just(buffer.byteBuffer()));
}
catch (Exception ex) {
logger.error(ex, ex);