diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java index 79717c196a5..cfc6fc61270 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java @@ -30,8 +30,6 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.publisher.Operators; -import org.springframework.core.io.buffer.DataBuffer; - /** * Abstract base class for {@code Publisher} implementations that bridge between * event-listener APIs and Reactive Streams. Specifically, base class for the @@ -42,7 +40,7 @@ import org.springframework.core.io.buffer.DataBuffer; * @see ServletServerHttpRequest * @see UndertowHttpHandlerAdapter */ -abstract class AbstractRequestBodyPublisher implements Publisher { +public abstract class AbstractRequestBodyPublisher implements Publisher { protected final Log logger = LogFactory.getLog(getClass()); @@ -50,11 +48,11 @@ abstract class AbstractRequestBodyPublisher implements Publisher { private final AtomicLong demand = new AtomicLong(); - private Subscriber subscriber; + private Subscriber subscriber; @Override - public void subscribe(Subscriber subscriber) { + public void subscribe(Subscriber subscriber) { if (this.logger.isTraceEnabled()) { this.logger.trace(this.state + " subscribe: " + subscriber); } @@ -66,7 +64,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher { * @see ReadListener#onDataAvailable() * @see org.xnio.ChannelListener#handleEvent(Channel) */ - protected final void onDataAvailable() { + public final void onDataAvailable() { if (this.logger.isTraceEnabled()) { this.logger.trace(this.state + " onDataAvailable"); } @@ -78,7 +76,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher { * @see ReadListener#onAllDataRead() * @see org.xnio.ChannelListener#handleEvent(Channel) */ - protected final void onAllDataRead() { + public final void onAllDataRead() { if (this.logger.isTraceEnabled()) { this.logger.trace(this.state + " onAllDataRead"); } @@ -86,11 +84,11 @@ abstract class AbstractRequestBodyPublisher implements Publisher { } /** - * Called by a listener interface to indicate that as error has occured. + * Called by a listener interface to indicate that as error has occurred. * @param t the error * @see ReadListener#onError(Throwable) */ - protected final void onError(Throwable t) { + public final void onError(Throwable t) { if (this.logger.isErrorEnabled()) { this.logger.error(this.state + " onError: " + t, t); } @@ -98,16 +96,16 @@ abstract class AbstractRequestBodyPublisher implements Publisher { } /** - * Reads and publishes data buffers from the input. Continues till either there is no + * Reads and publishes data from the input. Continues till either there is no * more demand, or till there is no more data to be read. * @return {@code true} if there is more demand; {@code false} otherwise */ private boolean readAndPublish() throws IOException { while (hasDemand()) { - DataBuffer dataBuffer = read(); - if (dataBuffer != null) { + T data = read(); + if (data != null) { getAndSub(this.demand, 1L); - this.subscriber.onNext(dataBuffer); + this.subscriber.onNext(data); } else { return true; @@ -142,11 +140,11 @@ abstract class AbstractRequestBodyPublisher implements Publisher { protected abstract void checkOnDataAvailable(); /** - * Reads a data buffer from the input, if possible. Returns {@code null} if a buffer + * Reads a data from the input, if possible. Returns {@code null} if a data * could not be read. - * @return the data buffer that was read; or {@code null} + * @return the data that was read; or {@code null} */ - protected abstract DataBuffer read() throws IOException; + protected abstract T read() throws IOException; private boolean hasDemand() { return (this.demand.get() > 0); @@ -159,9 +157,9 @@ abstract class AbstractRequestBodyPublisher implements Publisher { private static final class RequestBodySubscription implements Subscription { - private final AbstractRequestBodyPublisher publisher; + private final AbstractRequestBodyPublisher publisher; - public RequestBodySubscription(AbstractRequestBodyPublisher publisher) { + public RequestBodySubscription(AbstractRequestBodyPublisher publisher) { this.publisher = publisher; } @@ -214,7 +212,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher { */ UNSUBSCRIBED { @Override - void subscribe(AbstractRequestBodyPublisher publisher, Subscriber subscriber) { + void subscribe(AbstractRequestBodyPublisher publisher, Subscriber subscriber) { Objects.requireNonNull(subscriber); if (publisher.changeState(this, NO_DEMAND)) { Subscription subscription = new RequestBodySubscription(publisher); @@ -235,7 +233,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher { */ NO_DEMAND { @Override - void request(AbstractRequestBodyPublisher publisher, long n) { + void request(AbstractRequestBodyPublisher publisher, long n) { if (Operators.checkRequest(n, publisher.subscriber)) { Operators.addAndGet(publisher.demand, n); if (publisher.changeState(this, DEMAND)) { @@ -253,14 +251,14 @@ abstract class AbstractRequestBodyPublisher implements Publisher { */ DEMAND { @Override - void request(AbstractRequestBodyPublisher publisher, long n) { + void request(AbstractRequestBodyPublisher publisher, long n) { if (Operators.checkRequest(n, publisher.subscriber)) { Operators.addAndGet(publisher.demand, n); } } @Override - void onDataAvailable(AbstractRequestBodyPublisher publisher) { + void onDataAvailable(AbstractRequestBodyPublisher publisher) { if (publisher.changeState(this, READING)) { try { boolean demandAvailable = publisher.readAndPublish(); @@ -281,7 +279,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher { READING { @Override - void request(AbstractRequestBodyPublisher publisher, long n) { + void request(AbstractRequestBodyPublisher publisher, long n) { if (Operators.checkRequest(n, publisher.subscriber)) { Operators.addAndGet(publisher.demand, n); } @@ -293,40 +291,40 @@ abstract class AbstractRequestBodyPublisher implements Publisher { */ COMPLETED { @Override - void request(AbstractRequestBodyPublisher publisher, long n) { + void request(AbstractRequestBodyPublisher publisher, long n) { // ignore } @Override - void cancel(AbstractRequestBodyPublisher publisher) { + void cancel(AbstractRequestBodyPublisher publisher) { // ignore } @Override - void onAllDataRead(AbstractRequestBodyPublisher publisher) { + void onAllDataRead(AbstractRequestBodyPublisher publisher) { // ignore } @Override - void onError(AbstractRequestBodyPublisher publisher, Throwable t) { + void onError(AbstractRequestBodyPublisher publisher, Throwable t) { // ignore } }; - void subscribe(AbstractRequestBodyPublisher publisher, Subscriber subscriber) { + void subscribe(AbstractRequestBodyPublisher publisher, Subscriber subscriber) { throw new IllegalStateException(toString()); } - void request(AbstractRequestBodyPublisher publisher, long n) { + void request(AbstractRequestBodyPublisher publisher, long n) { throw new IllegalStateException(toString()); } - void cancel(AbstractRequestBodyPublisher publisher) { + void cancel(AbstractRequestBodyPublisher publisher) { publisher.changeState(this, COMPLETED); } - void onDataAvailable(AbstractRequestBodyPublisher publisher) { + void onDataAvailable(AbstractRequestBodyPublisher publisher) { // ignore } - void onAllDataRead(AbstractRequestBodyPublisher publisher) { + void onAllDataRead(AbstractRequestBodyPublisher publisher) { if (publisher.changeState(this, COMPLETED)) { if (publisher.subscriber != null) { publisher.subscriber.onComplete(); @@ -334,7 +332,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher { } } - void onError(AbstractRequestBodyPublisher publisher, Throwable t) { + void onError(AbstractRequestBodyPublisher publisher, Throwable t) { if (publisher.changeState(this, COMPLETED)) { if (publisher.subscriber != null) { publisher.subscriber.onError(t); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java index f26456ff731..a272c62a36e 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java @@ -29,8 +29,6 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.util.Assert; /** @@ -44,7 +42,7 @@ import org.springframework.util.Assert; * @see UndertowHttpHandlerAdapter * @see ServerHttpResponse#writeWith(Publisher) */ -abstract class AbstractResponseBodyProcessor implements Processor { +public abstract class AbstractResponseBodyProcessor implements Processor { protected final Log logger = LogFactory.getLog(getClass()); @@ -52,7 +50,7 @@ abstract class AbstractResponseBodyProcessor implements Processor state = new AtomicReference<>(State.UNSUBSCRIBED); - private volatile DataBuffer currentBuffer; + protected volatile T currentData; private volatile boolean subscriberCompleted; @@ -70,11 +68,11 @@ abstract class AbstractResponseBodyProcessor implements Processor void onSubscribe(AbstractResponseBodyProcessor processor, Subscription subscription) { Objects.requireNonNull(subscription, "Subscription cannot be null"); if (processor.changeState(this, REQUESTED)) { processor.subscription = subscription; @@ -209,7 +202,7 @@ abstract class AbstractResponseBodyProcessor implements Processor void onNext(AbstractResponseBodyProcessor processor, T data) { + if (processor.isDataEmpty(data)) { processor.subscription.request(1); } else { - processor.receiveBuffer(dataBuffer); + processor.receiveData(data); if (processor.changeState(this, RECEIVED)) { processor.writeIfPossible(); } @@ -230,16 +223,16 @@ abstract class AbstractResponseBodyProcessor implements Processor void onComplete(AbstractResponseBodyProcessor processor) { if (processor.changeState(this, COMPLETED)) { processor.resultPublisher.publishComplete(); } } }, /** - * State that gets entered after a buffer has been + * State that gets entered after a data has been * {@linkplain Subscriber#onNext(Object) received}. Responds to - * {@code onWritePossible} by writing the current buffer and changes + * {@code onWritePossible} by writing the current data and changes * the state to {@link #WRITING}. If it can be written completely, * changes the state to either {@link #REQUESTED} if the subscription * has not been completed; or {@link #COMPLETED} if it has. If it cannot @@ -248,13 +241,13 @@ abstract class AbstractResponseBodyProcessor implements Processor void onWritePossible(AbstractResponseBodyProcessor processor) { if (processor.changeState(this, WRITING)) { - DataBuffer dataBuffer = processor.currentBuffer; + T data = processor.currentData; try { - boolean writeCompleted = processor.write(dataBuffer); + boolean writeCompleted = processor.write(data); if (writeCompleted) { - processor.releaseBuffer(); + processor.releaseData(); if (!processor.subscriberCompleted) { processor.changeState(WRITING, REQUESTED); processor.subscription.request(1); @@ -277,18 +270,18 @@ abstract class AbstractResponseBodyProcessor implements Processor void onComplete(AbstractResponseBodyProcessor processor) { processor.subscriberCompleted = true; } }, /** - * State that gets entered after a writing of the current buffer has been + * State that gets entered after a writing of the current data has been * {@code onWritePossible started}. */ WRITING { @Override - public void onComplete(AbstractResponseBodyProcessor processor) { + public void onComplete(AbstractResponseBodyProcessor processor) { processor.subscriberCompleted = true; } }, @@ -298,40 +291,40 @@ abstract class AbstractResponseBodyProcessor implements Processor void onNext(AbstractResponseBodyProcessor processor, T data) { // ignore } @Override - public void onError(AbstractResponseBodyProcessor processor, Throwable ex) { + public void onError(AbstractResponseBodyProcessor processor, Throwable ex) { // ignore } @Override - public void onComplete(AbstractResponseBodyProcessor processor) { + public void onComplete(AbstractResponseBodyProcessor processor) { // ignore } }; - public void onSubscribe(AbstractResponseBodyProcessor processor, Subscription subscription) { + public void onSubscribe(AbstractResponseBodyProcessor processor, Subscription subscription) { subscription.cancel(); } - public void onNext(AbstractResponseBodyProcessor processor, DataBuffer dataBuffer) { + public void onNext(AbstractResponseBodyProcessor processor, T data) { throw new IllegalStateException(toString()); } - public void onError(AbstractResponseBodyProcessor processor, Throwable ex) { + public void onError(AbstractResponseBodyProcessor processor, Throwable ex) { if (processor.changeState(this, COMPLETED)) { processor.resultPublisher.publishError(ex); } } - public void onComplete(AbstractResponseBodyProcessor processor) { + public void onComplete(AbstractResponseBodyProcessor processor) { throw new IllegalStateException(toString()); } - public void onWritePossible(AbstractResponseBodyProcessor processor) { + public void onWritePossible(AbstractResponseBodyProcessor processor) { // ignore } } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java index 22f012e594d..58ee2f718bb 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java @@ -203,7 +203,7 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest { } - private static class RequestBodyPublisher extends AbstractRequestBodyPublisher { + private static class RequestBodyPublisher extends AbstractRequestBodyPublisher { private final RequestBodyPublisher.RequestBodyReadListener readListener = new RequestBodyPublisher.RequestBodyReadListener(); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java index f6151aa3a7f..20cc5a6019c 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java @@ -32,6 +32,7 @@ import org.reactivestreams.Publisher; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseCookie; @@ -183,7 +184,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } - private class ResponseBodyProcessor extends AbstractResponseBodyProcessor { + private class ResponseBodyProcessor extends AbstractResponseBodyProcessor { private final ServletOutputStream outputStream; @@ -199,6 +200,20 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons return this.outputStream.isReady(); } + @Override + protected void releaseData() { + if (logger.isTraceEnabled()) { + logger.trace("releaseBuffer: " + this.currentData); + } + DataBufferUtils.release(this.currentData); + this.currentData = null; + } + + @Override + protected boolean isDataEmpty(DataBuffer dataBuffer) { + return dataBuffer.readableByteCount() == 0; + } + @Override protected boolean write(DataBuffer dataBuffer) throws IOException { if (ServletServerHttpResponse.this.flushOnNext) { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java index 2c5fad52183..54dd95af8e3 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java @@ -106,7 +106,7 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { return Flux.from(this.body); } - private static class RequestBodyPublisher extends AbstractRequestBodyPublisher { + private static class RequestBodyPublisher extends AbstractRequestBodyPublisher { private final ChannelListener readListener = new ReadListener(); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index 9aad48d13ff..aaefc898eb2 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -36,6 +36,7 @@ import reactor.core.publisher.Mono; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseCookie; import org.springframework.http.ZeroCopyHttpOutputMessage; @@ -138,7 +139,7 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon } - private static class ResponseBodyProcessor extends AbstractResponseBodyProcessor { + private static class ResponseBodyProcessor extends AbstractResponseBodyProcessor { private final ChannelListener listener = new WriteListener(); @@ -187,17 +188,27 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon } @Override - protected void receiveBuffer(DataBuffer dataBuffer) { - super.receiveBuffer(dataBuffer); + protected void receiveData(DataBuffer dataBuffer) { + super.receiveData(dataBuffer); this.byteBuffer = dataBuffer.asByteBuffer(); } @Override - protected void releaseBuffer() { - super.releaseBuffer(); + protected void releaseData() { + if (logger.isTraceEnabled()) { + logger.trace("releaseBuffer: " + this.currentData); + } + DataBufferUtils.release(this.currentData); + this.currentData = null; + this.byteBuffer = null; } + @Override + protected boolean isDataEmpty(DataBuffer dataBuffer) { + return dataBuffer.readableByteCount() == 0; + } + private class WriteListener implements ChannelListener { @Override