Polishing

This commit is contained in:
Arjen Poutsma 2016-02-23 10:24:18 +01:00
parent f036f745a6
commit 51a90465d4
1 changed files with 17 additions and 17 deletions

View File

@ -17,6 +17,7 @@
package org.springframework.http.server.reactive; package org.springframework.http.server.reactive;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.AsyncContext; import javax.servlet.AsyncContext;
import javax.servlet.ReadListener; import javax.servlet.ReadListener;
@ -41,6 +42,7 @@ import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator; import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.StreamUtils;
/** /**
* @author Arjen Poutsma * @author Arjen Poutsma
@ -80,7 +82,7 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
servletRequest.getInputStream().setReadListener(requestBody); servletRequest.getInputStream().setReadListener(requestBody);
ResponseBodySubscriber responseBodySubscriber = ResponseBodySubscriber responseBodySubscriber =
new ResponseBodySubscriber(synchronizer, allocator); new ResponseBodySubscriber(synchronizer);
ServletServerHttpResponse response = new ServletServerHttpResponse(servletResponse, ServletServerHttpResponse response = new ServletServerHttpResponse(servletResponse,
publisher -> Mono.from(subscriber -> publisher.subscribe(responseBodySubscriber))); publisher -> Mono.from(subscriber -> publisher.subscribe(responseBodySubscriber)));
servletResponse.getOutputStream().setWriteListener(responseBodySubscriber); servletResponse.getOutputStream().setWriteListener(responseBodySubscriber);
@ -280,18 +282,14 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
private final ServletAsyncContextSynchronizer synchronizer; private final ServletAsyncContextSynchronizer synchronizer;
private final DataBufferAllocator allocator;
private Subscription subscription; private Subscription subscription;
private DataBuffer buffer; private DataBuffer dataBuffer;
private volatile boolean subscriberComplete = false; private volatile boolean subscriberComplete = false;
public ResponseBodySubscriber(ServletAsyncContextSynchronizer synchronizer, public ResponseBodySubscriber(ServletAsyncContextSynchronizer synchronizer) {
DataBufferAllocator allocator) {
this.synchronizer = synchronizer; this.synchronizer = synchronizer;
this.allocator = allocator;
} }
@ -303,9 +301,9 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
@Override @Override
public void onNext(DataBuffer bytes) { public void onNext(DataBuffer bytes) {
Assert.isNull(buffer); Assert.isNull(dataBuffer);
this.buffer = bytes; this.dataBuffer = bytes;
try { try {
onWritePossible(); onWritePossible();
} }
@ -316,11 +314,11 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
@Override @Override
public void onComplete() { public void onComplete() {
logger.debug("Complete buffer: " + (buffer == null)); logger.debug("Complete buffer: " + (dataBuffer == null));
this.subscriberComplete = true; this.subscriberComplete = true;
if (buffer == null) { if (dataBuffer == null) {
this.synchronizer.writeComplete(); this.synchronizer.writeComplete();
} }
} }
@ -330,14 +328,16 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
ServletOutputStream output = this.synchronizer.getOutputStream(); ServletOutputStream output = this.synchronizer.getOutputStream();
boolean ready = output.isReady(); boolean ready = output.isReady();
logger.debug("Output: " + ready + " buffer: " + (buffer == null)); logger.debug("Output: " + ready + " buffer: " + (dataBuffer == null));
if (ready) { if (ready) {
if (this.buffer != null) { if (this.dataBuffer != null) {
byte[] bytes = new byte[this.buffer.readableByteCount()]; InputStream in = this.dataBuffer.asInputStream();
this.buffer.read(bytes); byte[] buffer = new byte[BUFFER_SIZE];
this.buffer = null; int bytesRead;
output.write(bytes); while ((bytesRead = in.read(buffer)) != -1) {
output.write(buffer, 0, bytesRead);
}
if (!subscriberComplete) { if (!subscriberComplete) {
this.subscription.request(1); this.subscription.request(1);
} }