diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java index 262751b773..6bde76013d 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java @@ -124,6 +124,29 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo */ protected abstract void flush() throws IOException; + /** + * Whether writing is possible. + */ + protected abstract boolean isWritePossible(); + + /** + * Whether flushing is pending. + */ + protected abstract boolean isFlushPending(); + + /** + * Listeners can call this to notify when flushing is possible. + */ + protected final void onFlushPossible() { + this.state.get().onFlushPossible(this); + } + + private void flushIfPossible() { + if (isWritePossible()) { + onFlushPossible(); + } + } + private boolean changeState(State oldState, State newState) { return this.state.compareAndSet(oldState, newState); @@ -181,7 +204,12 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo return; } if (processor.subscriberCompleted) { - if (processor.changeState(this, COMPLETED)) { + if (processor.isFlushPending()) { + // Ensure the final flush + processor.changeState(this, FLUSHING); + processor.flushIfPossible(); + } + else if (processor.changeState(this, COMPLETED)) { processor.resultPublisher.publishComplete(); } } @@ -198,6 +226,28 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo } }, + FLUSHING { + public void onFlushPossible(AbstractListenerWriteFlushProcessor processor) { + try { + processor.flush(); + } + catch (IOException ex) { + processor.flushingFailed(ex); + return; + } + if (processor.changeState(this, COMPLETED)) { + processor.resultPublisher.publishComplete(); + } + } + public void onNext(AbstractListenerWriteFlushProcessor processor, Publisher publisher) { + // ignore + } + @Override + public void onComplete(AbstractListenerWriteFlushProcessor processor) { + // ignore + } + }, + COMPLETED { @Override public void onNext(AbstractListenerWriteFlushProcessor processor, Publisher publisher) { @@ -235,6 +285,10 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo // ignore } + public void onFlushPossible(AbstractListenerWriteFlushProcessor processor) { + // ignore + } + private static class WriteSubscriber implements Subscriber { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java index a4a02ad67a..05af6af643 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java @@ -18,7 +18,6 @@ package org.springframework.http.server.reactive; import java.io.IOException; import java.io.InputStream; -import java.io.UncheckedIOException; import java.nio.charset.Charset; import java.util.List; import java.util.Map; @@ -52,6 +51,8 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons private final HttpServletResponse response; + private final ServletOutputStream outputStream; + private final int bufferSize; @Nullable @@ -73,6 +74,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons Assert.isTrue(bufferSize > 0, "Buffer size must be greater than 0"); this.response = response; + this.outputStream = response.getOutputStream(); this.bufferSize = bufferSize; asyncContext.addListener(new ResponseAsyncListener()); @@ -147,7 +149,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons * @return the number of bytes written */ protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException { - ServletOutputStream outputStream = response.getOutputStream(); + ServletOutputStream outputStream = this.outputStream; InputStream input = dataBuffer.asInputStream(); int bytesWritten = 0; byte[] buffer = new byte[this.bufferSize]; @@ -160,7 +162,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } private void flush() throws IOException { - ServletOutputStream outputStream = this.response.getOutputStream(); + ServletOutputStream outputStream = this.outputStream; if (outputStream.isReady()) { try { outputStream.flush(); @@ -176,6 +178,10 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } } + private boolean isWritePossible() { + return this.outputStream.isReady(); + } + private final class ResponseAsyncListener implements AsyncListener { @@ -233,6 +239,12 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons if (processor != null) { processor.onWritePossible(); } + else { + ResponseBodyFlushProcessor flushProcessor = bodyFlushProcessor; + if (flushProcessor != null) { + flushProcessor.onFlushPossible(); + } + } } @Override @@ -242,6 +254,13 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons processor.cancel(); processor.onError(ex); } + else { + ResponseBodyFlushProcessor flushProcessor = bodyFlushProcessor; + if (flushProcessor != null) { + flushProcessor.cancel(); + flushProcessor.onError(ex); + } + } } } @@ -250,15 +269,9 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons @Override protected Processor createWriteProcessor() { - try { - ServletOutputStream outputStream = response.getOutputStream(); - ResponseBodyProcessor processor = new ResponseBodyProcessor(outputStream); - bodyProcessor = processor; - return processor; - } - catch (IOException ex) { - throw new UncheckedIOException(ex); - } + ResponseBodyProcessor processor = new ResponseBodyProcessor(); + bodyProcessor = processor; + return processor; } @Override @@ -268,20 +281,24 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } ServletServerHttpResponse.this.flush(); } + + @Override + protected boolean isWritePossible() { + return ServletServerHttpResponse.this.isWritePossible(); + } + + @Override + protected boolean isFlushPending() { + return flushOnNext; + } } private class ResponseBodyProcessor extends AbstractListenerWriteProcessor { - private final ServletOutputStream outputStream; - - public ResponseBodyProcessor(ServletOutputStream outputStream) { - this.outputStream = outputStream; - } - @Override protected boolean isWritePossible() { - return this.outputStream.isReady(); + return ServletServerHttpResponse.this.isWritePossible(); } @Override @@ -306,7 +323,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } flush(); } - boolean ready = this.outputStream.isReady(); + boolean ready = ServletServerHttpResponse.this.isWritePossible(); if (this.logger.isTraceEnabled()) { this.logger.trace("write: " + dataBuffer + " ready: " + ready); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index ebc9fb2339..becbb59715 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -147,8 +147,20 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon return new ResponseBodyProcessor(this.responseChannel); } + private boolean isWritePossible() { + if (this.responseChannel == null) { + this.responseChannel = this.exchange.getResponseChannel(); + } + if (this.responseChannel.isWriteResumed()) { + return true; + } else { + this.responseChannel.resumeWrites(); + return false; + } + } - private static class ResponseBodyProcessor extends AbstractListenerWriteProcessor { + + private class ResponseBodyProcessor extends AbstractListenerWriteProcessor { private final StreamSinkChannel channel; @@ -164,12 +176,7 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon @Override protected boolean isWritePossible() { - if (this.channel.isWriteResumed()) { - return true; - } else { - this.channel.resumeWrites(); - return false; - } + return UndertowServerHttpResponse.this.isWritePossible(); } @Override @@ -264,6 +271,16 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon cancel(); onError(t); } + + @Override + protected boolean isWritePossible() { + return UndertowServerHttpResponse.this.isWritePossible(); + } + + @Override + protected boolean isFlushPending() { + return false; + } } }