diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java index 213164902a..048fa38014 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java @@ -17,6 +17,7 @@ package org.springframework.http.server.reactive; import java.io.IOException; +import java.io.InputStream; import java.util.concurrent.atomic.AtomicLong; import javax.servlet.AsyncContext; import javax.servlet.ReadListener; @@ -41,6 +42,7 @@ import org.springframework.core.io.buffer.DataBufferAllocator; import org.springframework.core.io.buffer.DefaultDataBufferAllocator; import org.springframework.http.HttpStatus; import org.springframework.util.Assert; +import org.springframework.util.StreamUtils; /** * @author Arjen Poutsma @@ -80,7 +82,7 @@ public class ServletHttpHandlerAdapter extends HttpServlet { servletRequest.getInputStream().setReadListener(requestBody); ResponseBodySubscriber responseBodySubscriber = - new ResponseBodySubscriber(synchronizer, allocator); + new ResponseBodySubscriber(synchronizer); ServletServerHttpResponse response = new ServletServerHttpResponse(servletResponse, publisher -> Mono.from(subscriber -> publisher.subscribe(responseBodySubscriber))); servletResponse.getOutputStream().setWriteListener(responseBodySubscriber); @@ -280,18 +282,14 @@ public class ServletHttpHandlerAdapter extends HttpServlet { private final ServletAsyncContextSynchronizer synchronizer; - private final DataBufferAllocator allocator; - private Subscription subscription; - private DataBuffer buffer; + private DataBuffer dataBuffer; private volatile boolean subscriberComplete = false; - public ResponseBodySubscriber(ServletAsyncContextSynchronizer synchronizer, - DataBufferAllocator allocator) { + public ResponseBodySubscriber(ServletAsyncContextSynchronizer synchronizer) { this.synchronizer = synchronizer; - this.allocator = allocator; } @@ -303,9 +301,9 @@ public class ServletHttpHandlerAdapter extends HttpServlet { @Override public void onNext(DataBuffer bytes) { - Assert.isNull(buffer); + Assert.isNull(dataBuffer); - this.buffer = bytes; + this.dataBuffer = bytes; try { onWritePossible(); } @@ -316,11 +314,11 @@ public class ServletHttpHandlerAdapter extends HttpServlet { @Override public void onComplete() { - logger.debug("Complete buffer: " + (buffer == null)); + logger.debug("Complete buffer: " + (dataBuffer == null)); this.subscriberComplete = true; - if (buffer == null) { + if (dataBuffer == null) { this.synchronizer.writeComplete(); } } @@ -330,14 +328,16 @@ public class ServletHttpHandlerAdapter extends HttpServlet { ServletOutputStream output = this.synchronizer.getOutputStream(); boolean ready = output.isReady(); - logger.debug("Output: " + ready + " buffer: " + (buffer == null)); + logger.debug("Output: " + ready + " buffer: " + (dataBuffer == null)); if (ready) { - if (this.buffer != null) { - byte[] bytes = new byte[this.buffer.readableByteCount()]; - this.buffer.read(bytes); - this.buffer = null; - output.write(bytes); + if (this.dataBuffer != null) { + InputStream in = this.dataBuffer.asInputStream(); + byte[] buffer = new byte[BUFFER_SIZE]; + int bytesRead; + while ((bytesRead = in.read(buffer)) != -1) { + output.write(buffer, 0, bytesRead); + } if (!subscriberComplete) { this.subscription.request(1); }