diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java index 412ecdf8dbb..049b3c837bc 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java @@ -52,6 +52,8 @@ abstract class AbstractRequestBodyPublisher implements Publisher { private Subscriber subscriber; + private volatile boolean dataAvailable; + @Override public void subscribe(Subscriber subscriber) { if (this.logger.isTraceEnabled()) { @@ -199,7 +201,9 @@ abstract class AbstractRequestBodyPublisher implements Publisher { void subscribe(AbstractRequestBodyPublisher publisher, Subscriber subscriber) { Objects.requireNonNull(subscriber); - if (publisher.changeState(this, DATA_UNAVAILABLE)) { + State newState = + publisher.dataAvailable ? DATA_AVAILABLE : DATA_UNAVAILABLE; + if (publisher.changeState(this, newState)) { Subscription subscription = new RequestBodySubscription( publisher); publisher.subscriber = subscriber; @@ -209,6 +213,11 @@ abstract class AbstractRequestBodyPublisher implements Publisher { throw new IllegalStateException(toString()); } } + + @Override + void onDataAvailable(AbstractRequestBodyPublisher publisher) { + publisher.dataAvailable = true; + } }, /** * State that gets entered when there is no data to be read. Responds to {@link @@ -252,20 +261,11 @@ abstract class AbstractRequestBodyPublisher implements Publisher { } } - @Override - void onDataAvailable(AbstractRequestBodyPublisher publisher) { - // ignore - } }, /** * The terminal completed state. Does not respond to any events. */ COMPLETED { - @Override - void subscribe(AbstractRequestBodyPublisher publisher, - Subscriber subscriber) { - // ignore - } @Override void request(AbstractRequestBodyPublisher publisher, long n) { @@ -277,11 +277,6 @@ abstract class AbstractRequestBodyPublisher implements Publisher { // ignore } - @Override - void onDataAvailable(AbstractRequestBodyPublisher publisher) { - // ignore - } - @Override void onAllDataRead(AbstractRequestBodyPublisher publisher) { // ignore @@ -309,7 +304,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher { } void onDataAvailable(AbstractRequestBodyPublisher publisher) { - throw new IllegalStateException(toString()); + // ignore } void onAllDataRead(AbstractRequestBodyPublisher publisher) { diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java new file mode 100644 index 00000000000..c187a5c49d1 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java @@ -0,0 +1,486 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import java.io.IOException; +import java.nio.channels.Channel; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; +import javax.servlet.WriteListener; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Processor; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import reactor.core.util.BackpressureUtils; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.FlushingDataBuffer; +import org.springframework.core.io.buffer.support.DataBufferUtils; +import org.springframework.util.Assert; + +/** + * Abstract base class for {@code Subscriber} implementations that bridge between + * event-listener APIs and Reactive Streams. Specifically, base class for the Servlet 3.1 + * and Undertow support. + * @author Arjen Poutsma + * @see ServletServerHttpRequest + * @see UndertowHttpHandlerAdapter + */ +abstract class AbstractResponseBodyProcessor implements Processor { + + protected final Log logger = LogFactory.getLog(getClass()); + + private final AtomicReference subscriberState = + new AtomicReference<>(SubscriberState.UNSUBSCRIBED); + + private final AtomicReference publisherState = + new AtomicReference<>(PublisherState.UNSUBSCRIBED); + + private volatile DataBuffer currentBuffer; + + private volatile boolean subscriberCompleted; + + private volatile boolean publisherCompleted; + + private volatile Throwable publisherError; + + private Subscription subscription; + + private Subscriber subscriber; + + // Subscriber + + @Override + public final void onSubscribe(Subscription subscription) { + if (logger.isTraceEnabled()) { + logger.trace("SUB " + this.subscriberState + " onSubscribe: " + subscription); + } + this.subscriberState.get().onSubscribe(this, subscription); + } + + @Override + public final void onNext(DataBuffer dataBuffer) { + if (logger.isTraceEnabled()) { + logger.trace("SUB " + this.subscriberState + " onNext: " + dataBuffer); + } + this.subscriberState.get().onNext(this, dataBuffer); + } + + @Override + public final void onError(Throwable t) { + if (logger.isErrorEnabled()) { + logger.error("SUB " + this.subscriberState + " publishError: " + t, t); + } + this.subscriberState.get().onError(this, t); + } + + @Override + public final void onComplete() { + if (logger.isTraceEnabled()) { + logger.trace("SUB " + this.subscriberState + " onComplete"); + } + this.subscriberState.get().onComplete(this); + } + + // Publisher + + @Override + public final void subscribe(Subscriber subscriber) { + if (logger.isTraceEnabled()) { + logger.trace("PUB " + this.publisherState + " subscribe: " + subscriber); + } + this.publisherState.get().subscribe(this, subscriber); + } + + private void publishComplete() { + if (logger.isTraceEnabled()) { + logger.trace("PUB " + this.publisherState + " publishComplete"); + } + this.publisherState.get().publishComplete(this); + } + + private void publishError(Throwable t) { + if (logger.isTraceEnabled()) { + logger.trace("PUB " + this.publisherState + " publishError: " + t); + } + this.publisherState.get().publishError(this, t); + } + + // listener methods + + /** + * Called via a listener interface to indicate that writing is possible. + * @see WriteListener#onWritePossible() + * @see org.xnio.ChannelListener#handleEvent(Channel) + */ + protected final void onWritePossible() { + this.subscriberState.get().onWritePossible(this); + } + + /** + * Called when a {@link DataBuffer} is received via {@link Subscriber#onNext(Object)} + * @param dataBuffer the buffer that was received. + */ + protected void receiveBuffer(DataBuffer dataBuffer) { + Assert.state(this.currentBuffer == null); + this.currentBuffer = dataBuffer; + + checkOnWritePossible(); + } + + /** + * Called when a {@link DataBuffer} is received via {@link Subscriber#onNext(Object)} + * or when only partial data from the {@link DataBuffer} was written. + */ + protected void checkOnWritePossible() { + // no-op + } + + /** + * Called when the current buffer should be + * {@linkplain DataBufferUtils#release(DataBuffer) released}. + */ + protected void releaseBuffer() { + if (logger.isTraceEnabled()) { + logger.trace("releaseBuffer: " + this.currentBuffer); + } + DataBufferUtils.release(this.currentBuffer); + this.currentBuffer = null; + } + + /** + * Writes the given data buffer to the output, indicating if the entire buffer was + * written. + * @param dataBuffer the data buffer to write + * @return {@code true} if {@code dataBuffer} was fully written and a new buffer + * can be requested; {@code false} otherwise + */ + protected abstract boolean write(DataBuffer dataBuffer) throws IOException; + + /** + * Flushes the output. + */ + protected abstract void flush() throws IOException; + + /** + * Closes the output. + */ + protected abstract void close(); + + private boolean changeSubscriberState(SubscriberState oldState, + SubscriberState newState) { + return this.subscriberState.compareAndSet(oldState, newState); + } + + private boolean changePublisherState(PublisherState oldState, + PublisherState newState) { + return this.publisherState.compareAndSet(oldState, newState); + } + + private static final class ResponseBodySubscription implements Subscription { + + private final AbstractResponseBodyProcessor processor; + + public ResponseBodySubscription(AbstractResponseBodyProcessor processor) { + this.processor = processor; + } + + @Override + public final void request(long n) { + if (this.processor.logger.isTraceEnabled()) { + this.processor.logger.trace("PUB " + state() + " request: " + n); + } + state().request(this.processor, n); + } + + @Override + public final void cancel() { + if (this.processor.logger.isTraceEnabled()) { + this.processor.logger.trace("PUB " + state() + " cancel"); + } + state().cancel(this.processor); + } + + private PublisherState state() { + return this.processor.publisherState.get(); + } + + } + + /** + * Represents a state for the {@link Subscriber} to be in. The following figure + * indicate the four different states that exist, and the relationships between them. + * + *
+	 *       UNSUBSCRIBED
+	 *        |
+	 *        v
+	 * REQUESTED -------------------> RECEIVED
+	 *         ^                      ^
+	 *         |                      |
+	 *         --------- WRITING <-----
+	 *                      |
+	 *                      v
+	 *                  COMPLETED
+	 * 
+ * Refer to the individual states for more information. + */ + private enum SubscriberState { + + /** + * The initial unsubscribed state. Will respond to {@code onSubscribe} by + * requesting 1 buffer from the subscription, and change state to {@link + * #REQUESTED}. + */ + UNSUBSCRIBED { + @Override + void onSubscribe(AbstractResponseBodyProcessor processor, + Subscription subscription) { + Objects.requireNonNull(subscription, "Subscription cannot be null"); + if (processor.changeSubscriberState(this, REQUESTED)) { + processor.subscription = subscription; + subscription.request(1); + } + else { + super.onSubscribe(processor, subscription); + } + } + }, + /** + * State that gets entered after a buffer has been + * {@linkplain Subscription#request(long) requested}. Responds to {@code onNext} + * by changing state to {@link #RECEIVED}, and responds to {@code onComplete} by + * changing state to {@link #COMPLETED}. + */ + REQUESTED { + @Override + void onNext(AbstractResponseBodyProcessor processor, DataBuffer dataBuffer) { + if (processor.changeSubscriberState(this, RECEIVED)) { + processor.receiveBuffer(dataBuffer); + } + } + + @Override + void onComplete(AbstractResponseBodyProcessor processor) { + if (processor.changeSubscriberState(this, COMPLETED)) { + processor.subscriberCompleted = true; + processor.close(); + processor.publishComplete(); + } + } + }, + /** + * State that gets entered after a buffer has been + * {@linkplain Subscriber#onNext(Object) received}. Responds to + * {@code onWritePossible} by writing the current buffer and changes + * the state to {@link #WRITING}. If it can be written completely, + * changes the state to either {@link #REQUESTED} if the subscription + * has not been completed; or {@link #COMPLETED} if it has. If it cannot + * be written completely the state will be changed to {@link #RECEIVED}. + */ + RECEIVED { + @Override + void onWritePossible(AbstractResponseBodyProcessor processor) { + if (processor.changeSubscriberState(this, WRITING)) { + DataBuffer dataBuffer = processor.currentBuffer; + try { + boolean writeCompleted = processor.write(dataBuffer); + if (writeCompleted) { + if (dataBuffer instanceof FlushingDataBuffer) { + processor.flush(); + } + processor.releaseBuffer(); + if (!processor.subscriberCompleted) { + processor.changeSubscriberState(WRITING, REQUESTED); + processor.subscription.request(1); + } + else { + processor.changeSubscriberState(WRITING, COMPLETED); + processor.close(); + processor.publishComplete(); + } + } + else { + processor.changeSubscriberState(WRITING, RECEIVED); + processor.checkOnWritePossible(); + } + } + catch (IOException ex) { + processor.onError(ex); + } + } + } + + @Override + void onComplete(AbstractResponseBodyProcessor processor) { + processor.subscriberCompleted = true; + } + }, + /** + * State that gets entered after a writing of the current buffer has been + * {@code onWritePossible started}. + */ + WRITING { + @Override + void onComplete(AbstractResponseBodyProcessor processor) { + processor.subscriberCompleted = true; + } + }, + /** + * The terminal completed state. Does not respond to any events. + */ + COMPLETED { + @Override + void onNext(AbstractResponseBodyProcessor processor, DataBuffer dataBuffer) { + // ignore + } + + @Override + void onError(AbstractResponseBodyProcessor processor, Throwable t) { + // ignore + } + + @Override + void onComplete(AbstractResponseBodyProcessor processor) { + // ignore + } + }; + + void onSubscribe(AbstractResponseBodyProcessor processor, Subscription s) { + s.cancel(); + } + + void onNext(AbstractResponseBodyProcessor processor, DataBuffer dataBuffer) { + throw new IllegalStateException(toString()); + } + + void onError(AbstractResponseBodyProcessor processor, Throwable t) { + if (processor.changeSubscriberState(this, COMPLETED)) { + processor.publishError(t); + } + } + + void onComplete(AbstractResponseBodyProcessor processor) { + throw new IllegalStateException(toString()); + } + + void onWritePossible(AbstractResponseBodyProcessor processor) { + // ignore + } + } + + private enum PublisherState { + UNSUBSCRIBED { + @Override + void subscribe(AbstractResponseBodyProcessor processor, + Subscriber subscriber) { + Objects.requireNonNull(subscriber); + if (processor.changePublisherState(this, SUBSCRIBED)) { + Subscription subscription = new ResponseBodySubscription(processor); + processor.subscriber = subscriber; + subscriber.onSubscribe(subscription); + if (processor.publisherCompleted) { + processor.publishComplete(); + } + else if (processor.publisherError != null) { + processor.publishError(processor.publisherError); + } + } + else { + throw new IllegalStateException(toString()); + } + } + + @Override + void publishComplete(AbstractResponseBodyProcessor processor) { + processor.publisherCompleted = true; + } + + @Override + void publishError(AbstractResponseBodyProcessor processor, Throwable t) { + processor.publisherError = t; + } + }, + SUBSCRIBED { + @Override + void request(AbstractResponseBodyProcessor processor, long n) { + BackpressureUtils.checkRequest(n, processor.subscriber); + } + + @Override + void publishComplete(AbstractResponseBodyProcessor processor) { + if (processor.changePublisherState(this, COMPLETED)) { + processor.subscriber.onComplete(); + } + } + + @Override + void publishError(AbstractResponseBodyProcessor processor, Throwable t) { + if (processor.changePublisherState(this, COMPLETED)) { + processor.subscriber.onError(t); + } + } + + }, + COMPLETED { + @Override + void request(AbstractResponseBodyProcessor processor, long n) { + // ignore + } + + @Override + void cancel(AbstractResponseBodyProcessor processor) { + // ignore + } + + @Override + void publishComplete(AbstractResponseBodyProcessor processor) { + // ignore + } + + @Override + void publishError(AbstractResponseBodyProcessor processor, Throwable t) { + // ignore + } + }; + + void subscribe(AbstractResponseBodyProcessor processor, + Subscriber subscriber) { + throw new IllegalStateException(toString()); + } + + void request(AbstractResponseBodyProcessor processor, long n) { + throw new IllegalStateException(toString()); + } + + void cancel(AbstractResponseBodyProcessor processor) { + processor.changePublisherState(this, COMPLETED); + } + + void publishComplete(AbstractResponseBodyProcessor processor) { + throw new IllegalStateException(toString()); + } + + void publishError(AbstractResponseBodyProcessor processor, Throwable t) { + throw new IllegalStateException(toString()); + } + + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodySubscriber.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodySubscriber.java deleted file mode 100644 index ea8e28db060..00000000000 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodySubscriber.java +++ /dev/null @@ -1,321 +0,0 @@ -/* - * Copyright 2002-2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.http.server.reactive; - -import java.io.IOException; -import java.nio.channels.Channel; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; -import javax.servlet.WriteListener; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.FlushingDataBuffer; -import org.springframework.core.io.buffer.support.DataBufferUtils; -import org.springframework.util.Assert; - -/** - * Abstract base class for {@code Subscriber} implementations that bridge between - * event-listener APIs and Reactive Streams. Specifically, base class for the Servlet 3.1 - * and Undertow support. - * @author Arjen Poutsma - * @see ServletServerHttpRequest - * @see UndertowHttpHandlerAdapter - */ -abstract class AbstractResponseBodySubscriber implements Subscriber { - - protected final Log logger = LogFactory.getLog(getClass()); - - private final AtomicReference state = - new AtomicReference<>(State.UNSUBSCRIBED); - - private volatile DataBuffer currentBuffer; - - private volatile boolean subscriptionCompleted; - - private Subscription subscription; - - @Override - public final void onSubscribe(Subscription subscription) { - if (logger.isTraceEnabled()) { - logger.trace(this.state + " onSubscribe: " + subscription); - } - this.state.get().onSubscribe(this, subscription); - } - - @Override - public final void onNext(DataBuffer dataBuffer) { - if (logger.isTraceEnabled()) { - logger.trace(this.state + " onNext: " + dataBuffer); - } - this.state.get().onNext(this, dataBuffer); - } - - @Override - public final void onError(Throwable t) { - if (logger.isErrorEnabled()) { - logger.error(this.state + " onError: " + t, t); - } - this.state.get().onError(this, t); - } - - @Override - public final void onComplete() { - if (logger.isTraceEnabled()) { - logger.trace(this.state + " onComplete"); - } - this.state.get().onComplete(this); - } - - /** - * Called via a listener interface to indicate that writing is possible. - * @see WriteListener#onWritePossible() - * @see org.xnio.ChannelListener#handleEvent(Channel) - */ - protected final void onWritePossible() { - this.state.get().onWritePossible(this); - } - - /** - * Called when a {@link DataBuffer} is received via {@link Subscriber#onNext(Object)} - * @param dataBuffer the buffer that was received. - */ - protected void receiveBuffer(DataBuffer dataBuffer) { - Assert.state(this.currentBuffer == null); - this.currentBuffer = dataBuffer; - - checkOnWritePossible(); - } - - /** - * Called when a {@link DataBuffer} is received via {@link Subscriber#onNext(Object)} - * or when only partial data from the {@link DataBuffer} was written. - */ - protected void checkOnWritePossible() { - // no-op - } - - /** - * Called when the current buffer should be - * {@linkplain DataBufferUtils#release(DataBuffer) released}. - */ - protected void releaseBuffer() { - if (logger.isTraceEnabled()) { - logger.trace("releaseBuffer: " + this.currentBuffer); - } - DataBufferUtils.release(this.currentBuffer); - this.currentBuffer = null; - } - - /** - * Writes the given data buffer to the output, indicating if the entire buffer was - * written. - * @param dataBuffer the data buffer to write - * @return {@code true} if {@code dataBuffer} was fully written and a new buffer - * can be requested; {@code false} otherwise - */ - protected abstract boolean write(DataBuffer dataBuffer) throws IOException; - - /** - * Writes the given exception to the output. - */ - protected abstract void writeError(Throwable t); - - /** - * Flushes the output. - */ - protected abstract void flush() throws IOException; - - /** - * Closes the output. - */ - protected abstract void close(); - - private boolean changeState(State oldState, State newState) { - return this.state.compareAndSet(oldState, newState); - } - - /** - * Represents a state for the {@link Subscriber} to be in. The following figure - * indicate the four different states that exist, and the relationships between them. - * - *
-	 *       UNSUBSCRIBED
-	 *        |
-	 *        v
-	 * REQUESTED -------------------> RECEIVED
-	 *         ^                      ^
-	 *         |                      |
-	 *         --------- WRITING <-----
-	 *                      |
-	 *                      v
-	 *                  COMPLETED
-	 * 
- * Refer to the individual states for more information. - */ - private enum State { - - /** - * The initial unsubscribed state. Will respond to {@code onSubscribe} by - * requesting 1 buffer from the subscription, and change state to {@link - * #REQUESTED}. - */ - UNSUBSCRIBED { - @Override - void onSubscribe(AbstractResponseBodySubscriber subscriber, - Subscription subscription) { - Objects.requireNonNull(subscription, "Subscription cannot be null"); - if (subscriber.changeState(this, REQUESTED)) { - subscriber.subscription = subscription; - subscription.request(1); - } - else { - super.onSubscribe(subscriber, subscription); - } - } - }, - /** - * State that gets entered after a buffer has been - * {@linkplain Subscription#request(long) requested}. Responds to {@code onNext} - * by changing state to {@link #RECEIVED}, and responds to {@code onComplete} by - * changing state to {@link #COMPLETED}. - */ - REQUESTED { - @Override - void onNext(AbstractResponseBodySubscriber subscriber, - DataBuffer dataBuffer) { - if (subscriber.changeState(this, RECEIVED)) { - subscriber.receiveBuffer(dataBuffer); - } - } - - @Override - void onComplete(AbstractResponseBodySubscriber subscriber) { - if (subscriber.changeState(this, COMPLETED)) { - subscriber.subscriptionCompleted = true; - subscriber.close(); - } - } - }, - /** - * State that gets entered after a buffer has been - * {@linkplain Subscriber#onNext(Object) received}. Responds to - * {@code onWritePossible} by writing the current buffer and changes - * the state to {@link #WRITING}. If it can be written completely, - * changes the state to either {@link #REQUESTED} if the subscription - * has not been completed; or {@link #COMPLETED} if it has. If it cannot - * be written completely the state will be changed to {@link #RECEIVED}. - */ - RECEIVED { - @Override - void onWritePossible(AbstractResponseBodySubscriber subscriber) { - if (subscriber.changeState(this, WRITING)) { - DataBuffer dataBuffer = subscriber.currentBuffer; - try { - boolean writeCompleted = subscriber.write(dataBuffer); - if (writeCompleted) { - if (dataBuffer instanceof FlushingDataBuffer) { - subscriber.flush(); - } - subscriber.releaseBuffer(); - boolean subscriptionCompleted = subscriber.subscriptionCompleted; - if (!subscriptionCompleted) { - subscriber.changeState(WRITING, REQUESTED); - subscriber.subscription.request(1); - } - else { - subscriber.changeState(WRITING, COMPLETED); - subscriber.close(); - } - } - else { - subscriber.changeState(WRITING, RECEIVED); - subscriber.checkOnWritePossible(); - } - } - catch (IOException ex) { - subscriber.onError(ex); - } - } - } - - @Override - void onComplete(AbstractResponseBodySubscriber subscriber) { - subscriber.subscriptionCompleted = true; - } - }, - /** - * State that gets entered after a writing of the current buffer has been - * {@code onWritePossible started}. - */ - WRITING { - @Override - void onComplete(AbstractResponseBodySubscriber subscriber) { - subscriber.subscriptionCompleted = true; - } - }, - /** - * The terminal completed state. Does not respond to any events. - */ - COMPLETED { - @Override - void onNext(AbstractResponseBodySubscriber subscriber, - DataBuffer dataBuffer) { - // ignore - } - - @Override - void onError(AbstractResponseBodySubscriber subscriber, Throwable t) { - // ignore - } - - @Override - void onComplete(AbstractResponseBodySubscriber subscriber) { - // ignore - } - }; - - void onSubscribe(AbstractResponseBodySubscriber subscriber, Subscription s) { - s.cancel(); - } - - void onNext(AbstractResponseBodySubscriber subscriber, DataBuffer dataBuffer) { - throw new IllegalStateException(toString()); - } - - void onError(AbstractResponseBodySubscriber subscriber, Throwable t) { - if (subscriber.changeState(this, COMPLETED)) { - subscriber.writeError(t); - subscriber.close(); - } - } - - void onComplete(AbstractResponseBodySubscriber subscriber) { - throw new IllegalStateException(toString()); - } - - void onWritePossible(AbstractResponseBodySubscriber subscriber) { - // ignore - } - - } - -} diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java index 0416445ffa7..61ff7409e3e 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java @@ -38,7 +38,6 @@ import reactor.core.publisher.Mono; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DefaultDataBufferFactory; -import org.springframework.http.HttpStatus; import org.springframework.util.Assert; /** @@ -91,13 +90,15 @@ public class ServletHttpHandlerAdapter extends HttpServlet { ServletServerHttpRequest request = new ServletServerHttpRequest(servletRequest, requestBody); - ResponseBodySubscriber responseBody = - new ResponseBodySubscriber(synchronizer, this.bufferSize); + ResponseBodyProcessor responseBody = + new ResponseBodyProcessor(synchronizer, this.bufferSize); responseBody.registerListener(); ServletServerHttpResponse response = new ServletServerHttpResponse(servletResponse, this.dataBufferFactory, - publisher -> Mono - .from(subscriber -> publisher.subscribe(responseBody))); + publisher -> Mono.from(subscriber -> { + publisher.subscribe(responseBody); + responseBody.subscribe(subscriber); + })); HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(synchronizer); @@ -129,7 +130,7 @@ public class ServletHttpHandlerAdapter extends HttpServlet { logger.error("Error from request handling. Completing the request.", ex); HttpServletResponse response = (HttpServletResponse) this.synchronizer.getResponse(); - response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR.value()); + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); this.synchronizer.complete(); } @@ -206,8 +207,7 @@ public class ServletHttpHandlerAdapter extends HttpServlet { } } - - private static class ResponseBodySubscriber extends AbstractResponseBodySubscriber { + private static class ResponseBodyProcessor extends AbstractResponseBodyProcessor { private final ResponseBodyWriteListener writeListener = new ResponseBodyWriteListener(); @@ -218,7 +218,7 @@ public class ServletHttpHandlerAdapter extends HttpServlet { private volatile boolean flushOnNext; - public ResponseBodySubscriber(ServletAsyncContextSynchronizer synchronizer, + public ResponseBodyProcessor(ServletAsyncContextSynchronizer synchronizer, int bufferSize) { this.synchronizer = synchronizer; this.bufferSize = bufferSize; @@ -272,13 +272,6 @@ public class ServletHttpHandlerAdapter extends HttpServlet { } } - @Override - protected void writeError(Throwable t) { - HttpServletResponse response = - (HttpServletResponse) this.synchronizer.getResponse(); - response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); - } - @Override protected void flush() throws IOException { ServletOutputStream output = outputStream(); @@ -324,14 +317,14 @@ public class ServletHttpHandlerAdapter extends HttpServlet { @Override public void onWritePossible() throws IOException { - ResponseBodySubscriber.this.onWritePossible(); + ResponseBodyProcessor.this.onWritePossible(); } @Override public void onError(Throwable ex) { // Error on writing to the HTTP stream, so any further writes will probably // fail. Let's log instead of calling {@link #writeError}. - ResponseBodySubscriber.this.logger + ResponseBodyProcessor.this.logger .error("ResponseBodyWriteListener error", ex); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java index a856a7a1019..7380725e55e 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java @@ -70,11 +70,8 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle @Override public void onError(Throwable ex) { - if (exchange.isResponseStarted() || exchange.getStatusCode() > 500) { - logger.error("Error from request handling. Completing the request.", - ex); - } - else { + logger.error("Error from request handling. Completing the request.", ex); + if (!exchange.isResponseStarted() && exchange.getStatusCode() <= 500) { exchange.setStatusCode(500); } exchange.endExchange(); diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index 3dc11cf3954..0910723ea89 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -79,13 +79,14 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse @Override protected Mono writeWithInternal(Publisher publisher) { - return Mono.from(s -> { - // lazily create Subscriber, since calling - // {@link HttpServerExchange#getResponseChannel} as done in the - // ResponseBodySubscriber constructor commits the response status and headers - ResponseBodySubscriber subscriber = new ResponseBodySubscriber(this.exchange); - subscriber.registerListener(); - publisher.subscribe(subscriber); + // lazily create Subscriber, since calling + // {@link HttpServerExchange#getResponseChannel} as done in the + // ResponseBodyProcessor constructor commits the response status and headers + return Mono.from(subscriber -> { + ResponseBodyProcessor processor = new ResponseBodyProcessor(this.exchange); + processor.registerListener(); + publisher.subscribe(processor); + processor.subscribe(subscriber); }); } @@ -137,7 +138,7 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse } } - private static class ResponseBodySubscriber extends AbstractResponseBodySubscriber { + private static class ResponseBodyProcessor extends AbstractResponseBodyProcessor { private final ChannelListener listener = new WriteListener(); @@ -147,7 +148,7 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse private volatile ByteBuffer byteBuffer; - public ResponseBodySubscriber(HttpServerExchange exchange) { + public ResponseBodyProcessor(HttpServerExchange exchange) { this.exchange = exchange; this.responseChannel = exchange.getResponseChannel(); } @@ -157,14 +158,6 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse this.responseChannel.resumeWrites(); } - @Override - protected void writeError(Throwable t) { - if (!this.exchange.isResponseStarted() && - this.exchange.getStatusCode() < 500) { - this.exchange.setStatusCode(500); - } - } - @Override protected void flush() throws IOException { if (logger.isTraceEnabled()) { diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/ErrorHandlerIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/ErrorHandlerIntegrationTests.java new file mode 100644 index 00000000000..c7dfa83f739 --- /dev/null +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/ErrorHandlerIntegrationTests.java @@ -0,0 +1,108 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import java.io.IOException; +import java.net.URI; + +import org.junit.Test; +import reactor.core.publisher.Mono; + +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.http.client.ClientHttpResponse; +import org.springframework.http.server.reactive.boot.ReactorHttpServer; +import org.springframework.web.client.ResponseErrorHandler; +import org.springframework.web.client.RestTemplate; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeFalse; + +/** + * @author Arjen Poutsma + */ +public class ErrorHandlerIntegrationTests extends AbstractHttpHandlerIntegrationTests { + + private ErrorHandler handler = new ErrorHandler(); + + @Override + protected HttpHandler createHttpHandler() { + return handler; + } + + @Test + public void response() throws Exception { + // TODO: fix Reactor + assumeFalse(server instanceof ReactorHttpServer); + + RestTemplate restTemplate = new RestTemplate(); + restTemplate.setErrorHandler(NO_OP_ERROR_HANDLER); + + ResponseEntity response = restTemplate + .getForEntity(new URI("http://localhost:" + port + "/response"), + String.class); + + assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, response.getStatusCode()); + } + + @Test + public void returnValue() throws Exception { + // TODO: fix Reactor + assumeFalse(server instanceof ReactorHttpServer); + + RestTemplate restTemplate = new RestTemplate(); + restTemplate.setErrorHandler(NO_OP_ERROR_HANDLER); + + ResponseEntity response = restTemplate + .getForEntity(new URI("http://localhost:" + port + "/returnValue"), + String.class); + + assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, response.getStatusCode()); + } + + private static class ErrorHandler implements HttpHandler { + + @Override + public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { + Exception error = new UnsupportedOperationException(); + String path = request.getURI().getPath(); + if (path.endsWith("response")) { + return response.writeWith(Mono.error(error)); + } + else if (path.endsWith("returnValue")) { + return Mono.error(error); + } + else { + return Mono.empty(); + } + } + } + + private static final ResponseErrorHandler NO_OP_ERROR_HANDLER = + new ResponseErrorHandler() { + + @Override + public boolean hasError(ClientHttpResponse response) throws IOException { + return false; + } + + @Override + public void handleError(ClientHttpResponse response) throws IOException { + } + }; + +} diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/RandomHandlerIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/RandomHandlerIntegrationTests.java index 095c8f86e02..18062e4360f 100644 --- a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/RandomHandlerIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/RandomHandlerIntegrationTests.java @@ -21,8 +21,6 @@ import java.util.Random; import org.junit.Test; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -31,12 +29,9 @@ import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.http.RequestEntity; import org.springframework.http.ResponseEntity; -import org.springframework.http.server.reactive.boot.ReactorHttpServer; import org.springframework.web.client.RestTemplate; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assume.assumeFalse; +import static org.junit.Assert.*; public class RandomHandlerIntegrationTests extends AbstractHttpHandlerIntegrationTests { @@ -60,7 +55,6 @@ public class RandomHandlerIntegrationTests extends AbstractHttpHandlerIntegratio @Test public void random() throws Throwable { // TODO: fix Reactor support - assumeFalse(server instanceof ReactorHttpServer); RestTemplate restTemplate = new RestTemplate(); @@ -72,14 +66,6 @@ public class RandomHandlerIntegrationTests extends AbstractHttpHandlerIntegratio assertEquals(RESPONSE_SIZE, response.getHeaders().getContentLength()); assertEquals(RESPONSE_SIZE, response.getBody().length); - - while (!handler.requestComplete) { - Thread.sleep(100); - } - if (handler.requestError != null) { - throw handler.requestError; - } - assertEquals(REQUEST_SIZE, handler.requestSize); } @@ -93,45 +79,21 @@ public class RandomHandlerIntegrationTests extends AbstractHttpHandlerIntegratio public static final int CHUNKS = 16; - private volatile boolean requestComplete; - - private int requestSize; - - private Throwable requestError; - @Override public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { - requestError = null; + Mono requestSizeMono = request.getBody(). + reduce(0, (integer, dataBuffer) -> integer + + dataBuffer.readableByteCount()). + doAfterTerminate((size, throwable) -> { + assertNull(throwable); + assertEquals(REQUEST_SIZE, (long) size); + }); - request.getBody().subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - requestComplete = false; - requestSize = 0; - requestError = null; - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(DataBuffer bytes) { - requestSize += bytes.readableByteCount(); - } - - @Override - public void onError(Throwable t) { - requestComplete = true; - requestError = t; - } - - @Override - public void onComplete() { - requestComplete = true; - } - }); response.getHeaders().setContentLength(RESPONSE_SIZE); - return response.writeWith(multipleChunks()); + + return requestSizeMono.then(response.writeWith(multipleChunks())); } private Publisher singleChunk() {