diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/codec/decoder/Jaxb2Decoder.java b/spring-web-reactive/src/main/java/org/springframework/reactive/codec/decoder/Jaxb2Decoder.java index 462baab1c9e..d0d930c3610 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/codec/decoder/Jaxb2Decoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/codec/decoder/Jaxb2Decoder.java @@ -42,7 +42,7 @@ import org.springframework.core.ResolvableType; import org.springframework.http.MediaType; import org.springframework.reactive.codec.CodecException; import org.springframework.reactive.codec.encoder.Jaxb2Encoder; -import org.springframework.reactive.io.ByteArrayPublisherInputStream; +import org.springframework.reactive.io.ByteBufferPublisherInputStream; import org.springframework.util.Assert; /** @@ -63,10 +63,9 @@ public class Jaxb2Decoder implements ByteToMessageDecoder { @Override public Publisher decode(Publisher inputStream, ResolvableType type, MediaType mediaType, Object... hints) { - Stream stream = Streams.wrap(inputStream).map(chunk -> new Buffer(chunk).asBytes()); Class outputClass = type.getRawClass(); try { - Source source = processSource(new StreamSource(new ByteArrayPublisherInputStream(stream))); + Source source = processSource(new StreamSource(new ByteBufferPublisherInputStream(inputStream))); Unmarshaller unmarshaller = createUnmarshaller(outputClass); if (outputClass.isAnnotationPresent(XmlRootElement.class)) { return Streams.just(unmarshaller.unmarshal(source)); diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/io/ByteArrayPublisherOutputStream.java b/spring-web-reactive/src/main/java/org/springframework/reactive/io/ByteArrayPublisherOutputStream.java deleted file mode 100644 index 806a13c238c..00000000000 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/io/ByteArrayPublisherOutputStream.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.reactive.io; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.List; - -import org.reactivestreams.Publisher; -import reactor.rx.Streams; - -/** - * {@code OutputStream} implementation that stores all written bytes, to be retrieved - * using {@link #toByteArrayPublisher()}. - * @author Arjen Poutsma - */ -public class ByteArrayPublisherOutputStream extends OutputStream { - - private final List buffers = new ArrayList<>(); - - - /** - * Returns the written data as a {@code Publisher}. - * @return a publisher for the written bytes - */ - public Publisher toByteArrayPublisher() { - return Streams.from(buffers); - } - - @Override - public void write(int b) throws IOException { - write(new byte[]{(byte) b}); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - byte[] copy = new byte[len - off]; - System.arraycopy(b, off, copy, 0, len); - buffers.add(copy); - } - -} diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/io/ByteArrayPublisherInputStream.java b/spring-web-reactive/src/main/java/org/springframework/reactive/io/ByteBufferPublisherInputStream.java similarity index 86% rename from spring-web-reactive/src/main/java/org/springframework/reactive/io/ByteArrayPublisherInputStream.java rename to spring-web-reactive/src/main/java/org/springframework/reactive/io/ByteBufferPublisherInputStream.java index 2156f654bf8..e5ccea71e41 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/io/ByteArrayPublisherInputStream.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/io/ByteBufferPublisherInputStream.java @@ -16,9 +16,9 @@ package org.springframework.reactive.io; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -32,13 +32,14 @@ import org.springframework.util.Assert; /** * {@code InputStream} implementation based on a byte array {@link Publisher}. * @author Arjen Poutsma + * @author Sebastien Deleuze */ -public class ByteArrayPublisherInputStream extends InputStream { +public class ByteBufferPublisherInputStream extends InputStream { - private final BlockingQueue> queue = + private final BlockingQueue> queue = new LinkedBlockingQueue<>(); - private ByteArrayInputStream currentStream; + private ByteBufferInputStream currentStream; private boolean completed; @@ -47,7 +48,7 @@ public class ByteArrayPublisherInputStream extends InputStream { * Creates a new {@code ByteArrayPublisherInputStream} based on the given publisher. * @param publisher the publisher to use */ - public ByteArrayPublisherInputStream(Publisher publisher) { + public ByteBufferPublisherInputStream(Publisher publisher) { this(publisher, 1); } @@ -57,7 +58,7 @@ public class ByteArrayPublisherInputStream extends InputStream { * @param requestSize the {@linkplain Subscription#request(long) request size} to use * on the publisher */ - public ByteArrayPublisherInputStream(Publisher publisher, long requestSize) { + public ByteBufferPublisherInputStream(Publisher publisher, long requestSize) { Assert.notNull(publisher, "'publisher' must not be null"); publisher.subscribe(new BlockingQueueSubscriber(requestSize)); @@ -130,11 +131,11 @@ public class ByteArrayPublisherInputStream extends InputStream { } else { // take() blocks, but that's OK since this is a *blocking* InputStream - PublisherSignal signal = this.queue.take(); + PublisherSignal signal = this.queue.take(); if (signal.isData()) { - byte[] data = signal.data(); - this.currentStream = new ByteArrayInputStream(data); + ByteBuffer data = signal.data(); + this.currentStream = new ByteBufferInputStream(data); return this.currentStream; } else if (signal.isComplete()) { @@ -159,7 +160,7 @@ public class ByteArrayPublisherInputStream extends InputStream { throw new IOException(); } - private class BlockingQueueSubscriber implements Subscriber { + private class BlockingQueueSubscriber implements Subscriber { private final long requestSize; @@ -177,7 +178,7 @@ public class ByteArrayPublisherInputStream extends InputStream { } @Override - public void onNext(byte[] bytes) { + public void onNext(ByteBuffer bytes) { try { queue.put(PublisherSignal.data(bytes)); this.subscription.request(requestSize); diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/RequestBodyArgumentResolver.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/RequestBodyArgumentResolver.java index 6a7159d8fd3..ce8d83983e4 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/RequestBodyArgumentResolver.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/RequestBodyArgumentResolver.java @@ -78,7 +78,7 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve ByteToMessageDecoder deserializer = resolveDeserializers(request, type, mediaType, hints.toArray()); if (deserializer != null) { - Publisher inputStream = Streams.wrap(request.getBody()).map(bytes -> ByteBuffer.wrap(bytes)); + Publisher inputStream = request.getBody(); List> preProcessors = resolvePreProcessors(request, type, mediaType, hints.toArray()); for (ByteToMessageDecoder preProcessor : preProcessors) { inputStream = preProcessor.decode(inputStream, type, mediaType, hints.toArray()); diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/ResponseBodyResultHandler.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/ResponseBodyResultHandler.java index 69f8d483c91..387fa657e63 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/ResponseBodyResultHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/ResponseBodyResultHandler.java @@ -23,7 +23,6 @@ import java.util.Collections; import java.util.List; import org.reactivestreams.Publisher; -import reactor.io.buffer.Buffer; import reactor.rx.Streams; import rx.Observable; import rx.RxReactiveStreams; @@ -125,7 +124,7 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered outputStream = postProcessor.encode(outputStream, type, mediaType, hints.toArray()); } response.getHeaders().setContentType(mediaType); - return response.writeWith(Streams.wrap(outputStream).map(buffer -> new Buffer(buffer).asBytes())); + return response.writeWith(Streams.wrap(outputStream)); } return Streams.fail(new IllegalStateException( "Return value type not supported: " + returnType)); diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/ServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/ServerHttpRequest.java index c2e24d2f9cb..bfbf9771efe 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/ServerHttpRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/ServerHttpRequest.java @@ -15,6 +15,8 @@ */ package org.springframework.reactive.web.http; +import java.nio.ByteBuffer; + import org.reactivestreams.Publisher; /** @@ -23,6 +25,6 @@ import org.reactivestreams.Publisher; */ public interface ServerHttpRequest extends HttpRequest { - Publisher getBody(); + Publisher getBody(); } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/ServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/ServerHttpResponse.java index a485b898ebf..3063aa2b2fe 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/ServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/ServerHttpResponse.java @@ -15,6 +15,8 @@ */ package org.springframework.reactive.web.http; +import java.nio.ByteBuffer; + import org.reactivestreams.Publisher; import org.springframework.http.HttpStatus; @@ -26,6 +28,6 @@ public interface ServerHttpResponse extends HttpMessage { void setStatusCode(HttpStatus status); - Publisher writeWith(Publisher contentPublisher); + Publisher writeWith(Publisher contentPublisher); } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpRequest.java index 4c80e78c507..e422bb75020 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpRequest.java @@ -17,6 +17,7 @@ package org.springframework.reactive.web.http.rxnetty; import java.net.URI; import java.net.URISyntaxException; +import java.nio.ByteBuffer; import io.netty.buffer.ByteBuf; import io.reactivex.netty.protocol.http.server.HttpServerRequest; @@ -74,12 +75,8 @@ public class RxNettyServerHttpRequest implements ServerHttpRequest { } @Override - public Publisher getBody() { - Observable bytesContent = this.request.getContent().map(byteBuf -> { - byte[] copy = new byte[byteBuf.readableBytes()]; - byteBuf.readBytes(copy); - return copy; - }); + public Publisher getBody() { + Observable bytesContent = this.request.getContent().map(byteBuf -> byteBuf.nioBuffer()); return rx.RxReactiveStreams.toPublisher(bytesContent); } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpResponse.java index ca41e01e55b..f8e941d6399 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpResponse.java @@ -15,9 +15,12 @@ */ package org.springframework.reactive.web.http.rxnetty; +import java.nio.ByteBuffer; + import io.netty.handler.codec.http.HttpResponseStatus; import io.reactivex.netty.protocol.http.server.HttpServerResponse; import org.reactivestreams.Publisher; +import reactor.io.buffer.Buffer; import rx.Observable; import rx.RxReactiveStreams; @@ -56,9 +59,9 @@ public class RxNettyServerHttpResponse implements ServerHttpResponse { } @Override - public Publisher writeWith(Publisher contentPublisher) { + public Publisher writeWith(Publisher contentPublisher) { writeHeaders(); - Observable contentObservable = RxReactiveStreams.toObservable(contentPublisher); + Observable contentObservable = RxReactiveStreams.toObservable(contentPublisher).map(content -> new Buffer(content).asBytes()); return RxReactiveStreams.toPublisher(this.response.writeBytes(contentObservable)); } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/servlet/RequestBodyPublisher.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/servlet/RequestBodyPublisher.java index 35cc27fdcbe..8ce7c8c827f 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/servlet/RequestBodyPublisher.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/servlet/RequestBodyPublisher.java @@ -17,6 +17,7 @@ package org.springframework.reactive.web.http.servlet; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.Arrays; import javax.servlet.ReadListener; @@ -33,7 +34,7 @@ import org.springframework.reactive.util.DemandCounter; /** * @author Arjen Poutsma */ -public class RequestBodyPublisher implements ReadListener, Publisher { +public class RequestBodyPublisher implements ReadListener, Publisher { private final Charset UTF_8 = Charset.forName("UTF-8"); @@ -45,7 +46,7 @@ public class RequestBodyPublisher implements ReadListener, Publisher { private final DemandCounter demand = new DemandCounter(); - private Subscriber subscriber; + private Subscriber subscriber; private boolean stalled; @@ -57,7 +58,7 @@ public class RequestBodyPublisher implements ReadListener, Publisher { } @Override - public void subscribe(Subscriber subscriber) { + public void subscribe(Subscriber subscriber) { if (subscriber == null) { throw new NullPointerException(); } @@ -103,7 +104,7 @@ public class RequestBodyPublisher implements ReadListener, Publisher { // logger.debug("Next: " + new String(copy, UTF_8)); - this.subscriber.onNext(copy); + this.subscriber.onNext(ByteBuffer.wrap(copy)); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/servlet/ResponseBodySubscriber.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/servlet/ResponseBodySubscriber.java index 39aaf869748..4d7ca620116 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/servlet/ResponseBodySubscriber.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/servlet/ResponseBodySubscriber.java @@ -17,6 +17,7 @@ package org.springframework.reactive.web.http.servlet; import java.io.IOException; +import java.nio.ByteBuffer; import javax.servlet.ServletOutputStream; import javax.servlet.WriteListener; @@ -24,13 +25,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import reactor.io.buffer.Buffer; import org.springframework.util.Assert; /** * @author Arjen Poutsma */ -public class ResponseBodySubscriber implements WriteListener, Subscriber { +public class ResponseBodySubscriber implements WriteListener, Subscriber { private static final Log logger = LogFactory.getLog(ResponseBodySubscriber.class); @@ -38,7 +40,7 @@ public class ResponseBodySubscriber implements WriteListener, Subscriber private Subscription subscription; - private byte[] buffer; + private ByteBuffer buffer; private volatile boolean subscriberComplete = false; @@ -53,8 +55,7 @@ public class ResponseBodySubscriber implements WriteListener, Subscriber } @Override - public void onNext(byte[] bytes) { - logger.debug("Next: " + bytes.length + " bytes"); + public void onNext(ByteBuffer bytes) { Assert.isNull(buffer); @@ -87,7 +88,7 @@ public class ResponseBodySubscriber implements WriteListener, Subscriber if (ready) { if (this.buffer != null) { - output.write(this.buffer); + output.write(new Buffer(this.buffer).asBytes()); this.buffer = null; if (!subscriberComplete) { diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/servlet/ServletServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/servlet/ServletServerHttpRequest.java index 583449e2355..ebeb95d510f 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/servlet/ServletServerHttpRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/servlet/ServletServerHttpRequest.java @@ -17,6 +17,7 @@ package org.springframework.reactive.web.http.servlet; import java.net.URI; import java.net.URISyntaxException; +import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.Enumeration; import java.util.Map; @@ -39,12 +40,12 @@ public class ServletServerHttpRequest implements ServerHttpRequest { private final HttpServletRequest servletRequest; - private final Publisher requestBodyPublisher; + private final Publisher requestBodyPublisher; private HttpHeaders headers; - public ServletServerHttpRequest(HttpServletRequest servletRequest, Publisher requestBodyPublisher) { + public ServletServerHttpRequest(HttpServletRequest servletRequest, Publisher requestBodyPublisher) { Assert.notNull(servletRequest, "HttpServletRequest must not be null"); this.servletRequest = servletRequest; this.requestBodyPublisher = requestBodyPublisher; @@ -111,7 +112,7 @@ public class ServletServerHttpRequest implements ServerHttpRequest { } @Override - public Publisher getBody() { + public Publisher getBody() { return this.requestBodyPublisher; } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/servlet/ServletServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/servlet/ServletServerHttpResponse.java index f4c43fada86..0895d62b69e 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/servlet/ServletServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/servlet/ServletServerHttpResponse.java @@ -15,6 +15,7 @@ */ package org.springframework.reactive.web.http.servlet; +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import javax.servlet.http.HttpServletResponse; @@ -60,7 +61,7 @@ public class ServletServerHttpResponse implements ServerHttpResponse { } @Override - public Publisher writeWith(final Publisher contentPublisher) { + public Publisher writeWith(final Publisher contentPublisher) { writeHeaders(); return (s -> contentPublisher.subscribe(responseSubscriber)); } diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/io/ByteArrayPublisherInputStreamTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/io/ByteArrayPublisherInputStreamTests.java deleted file mode 100644 index 362355b0896..00000000000 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/io/ByteArrayPublisherInputStreamTests.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.reactive.io; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.junit.Before; -import org.junit.Test; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import reactor.rx.Stream; -import reactor.rx.Streams; - -import org.springframework.util.FileCopyUtils; - -import static org.junit.Assert.*; - -/** - * @author Arjen Poutsma - */ -public class ByteArrayPublisherInputStreamTests { - - - private ByteArrayPublisherInputStream is; - - @Before - public void createStream() { - Stream stream = - Streams.just(new byte[]{'a', 'b', 'c'}, new byte[]{'d', 'e'}); - - is = new ByteArrayPublisherInputStream(stream); - } - - @Test - public void reactor() throws Exception { - assertEquals(3, is.available()); - - int ch = is.read(); - assertEquals('a', ch); - ch = is.read(); - assertEquals('b', ch); - ch = is.read(); - assertEquals('c', ch); - - assertEquals(2, is.available()); - ch = is.read(); - assertEquals('d', ch); - ch = is.read(); - assertEquals('e', ch); - - ch = is.read(); - assertEquals(-1, ch); - - assertEquals(0, is.available()); - } - - @Test - public void copy() throws Exception { - ByteArrayPublisherOutputStream os = new ByteArrayPublisherOutputStream(); - - FileCopyUtils.copy(is, os); - - Publisher publisher = os.toByteArrayPublisher(); - List result = new ArrayList<>(); - AtomicBoolean complete = new AtomicBoolean(); - - publisher.subscribe(new Subscriber() { - - @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(byte[] bytes) { - result.add(bytes); - } - - @Override - public void onError(Throwable t) { - fail(t.getMessage()); - } - - @Override - public void onComplete() { - complete.set(true); - } - }); - - while (!complete.get()) { - - } - assertArrayEquals(result.get(0), new byte[]{'a', 'b', 'c'}); - assertArrayEquals(result.get(1), new byte[]{'d', 'e'}); - - } - -} \ No newline at end of file diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/web/dispatch/handler/SimpleUrlHandlerMappingIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/web/dispatch/handler/SimpleUrlHandlerMappingIntegrationTests.java index 8bbe9841fb2..b9700a02f13 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/web/dispatch/handler/SimpleUrlHandlerMappingIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/web/dispatch/handler/SimpleUrlHandlerMappingIntegrationTests.java @@ -22,6 +22,7 @@ import java.util.Map; import org.junit.Test; import org.reactivestreams.Publisher; +import reactor.io.buffer.Buffer; import reactor.rx.Streams; import org.springframework.http.RequestEntity; @@ -97,7 +98,7 @@ public class SimpleUrlHandlerMappingIntegrationTests extends AbstractHttpHandler @Override public Publisher handle(ServerHttpRequest request, ServerHttpResponse response) { - return response.writeWith(Streams.just("foo".getBytes(UTF_8))); + return response.writeWith(Streams.just(Buffer.wrap("foo").byteBuffer())); } } @@ -105,7 +106,7 @@ public class SimpleUrlHandlerMappingIntegrationTests extends AbstractHttpHandler @Override public Publisher handle(ServerHttpRequest request, ServerHttpResponse response) { - return response.writeWith(Streams.just("bar".getBytes(UTF_8))); + return response.writeWith(Streams.just(Buffer.wrap("bar").byteBuffer())); } } diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/web/http/RandomHandler.java b/spring-web-reactive/src/test/java/org/springframework/reactive/web/http/RandomHandler.java index 111b00b70bd..69244b098c3 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/web/http/RandomHandler.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/web/http/RandomHandler.java @@ -16,6 +16,7 @@ package org.springframework.reactive.web.http; +import java.nio.ByteBuffer; import java.util.Random; import org.apache.commons.logging.Log; @@ -23,6 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import reactor.io.buffer.Buffer; import reactor.rx.Streams; import static org.junit.Assert.assertEquals; @@ -41,7 +43,7 @@ public class RandomHandler implements HttpHandler { @Override public Publisher handle(ServerHttpRequest request, ServerHttpResponse response) { - request.getBody().subscribe(new Subscriber() { + request.getBody().subscribe(new Subscriber() { private Subscription s; private int requestSize = 0; @@ -53,8 +55,8 @@ public class RandomHandler implements HttpHandler { } @Override - public void onNext(byte[] bytes) { - requestSize += bytes.length; + public void onNext(ByteBuffer bytes) { + requestSize += new Buffer(bytes).limit(); } @Override @@ -71,7 +73,7 @@ public class RandomHandler implements HttpHandler { }); response.getHeaders().setContentLength(RESPONSE_SIZE); - return response.writeWith(Streams.just(randomBytes())); + return response.writeWith(Streams.just(ByteBuffer.wrap(randomBytes()))); } private byte[] randomBytes() { diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/web/http/XmlHandler.java b/spring-web-reactive/src/test/java/org/springframework/reactive/web/http/XmlHandler.java index 391f6acbffc..dcf95a8dc2a 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/web/http/XmlHandler.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/web/http/XmlHandler.java @@ -23,13 +23,14 @@ import javax.xml.bind.Unmarshaller; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.reactivestreams.Publisher; -import reactor.rx.Streams; import org.springframework.http.MediaType; -import org.springframework.reactive.io.ByteArrayPublisherInputStream; -import org.springframework.reactive.io.ByteArrayPublisherOutputStream; +import org.springframework.reactive.io.BufferOutputStream; +import org.springframework.reactive.io.ByteBufferPublisherInputStream; import static org.junit.Assert.fail; +import reactor.io.buffer.Buffer; +import reactor.rx.Streams; /** * @author Arjen Poutsma @@ -48,7 +49,7 @@ public class XmlHandler implements HttpHandler { Runnable r = () -> { try { - ByteArrayPublisherInputStream bis = new ByteArrayPublisherInputStream(request.getBody()); + ByteBufferPublisherInputStream bis = new ByteBufferPublisherInputStream(request.getBody()); XmlHandlerIntegrationTests.Person johnDoe = (XmlHandlerIntegrationTests.Person) unmarshaller.unmarshal(bis); @@ -66,11 +67,13 @@ public class XmlHandler implements HttpHandler { response.getHeaders().setContentType(MediaType.APPLICATION_XML); XmlHandlerIntegrationTests.Person janeDoe = new XmlHandlerIntegrationTests.Person("Jane Doe"); - ByteArrayPublisherOutputStream bos = new ByteArrayPublisherOutputStream(); + Buffer buffer = new Buffer(); + BufferOutputStream bos = new BufferOutputStream(buffer); marshaller.marshal(janeDoe, bos); bos.close(); + buffer.flip(); - return response.writeWith(bos.toByteArrayPublisher()); + return response.writeWith(Streams.just(buffer.byteBuffer())); } catch (Exception ex) { logger.error(ex, ex);