diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java index c187a5c49d1..e4463fb6a77 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java @@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFactory; import org.reactivestreams.Processor; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import reactor.core.util.BackpressureUtils; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.FlushingDataBuffer; @@ -46,80 +45,57 @@ abstract class AbstractResponseBodyProcessor implements Processor subscriberState = - new AtomicReference<>(SubscriberState.UNSUBSCRIBED); + private final ResponseBodyWriteResultPublisher publisherDelegate = + new ResponseBodyWriteResultPublisher(); - private final AtomicReference publisherState = - new AtomicReference<>(PublisherState.UNSUBSCRIBED); + private final AtomicReference state = + new AtomicReference<>(State.UNSUBSCRIBED); private volatile DataBuffer currentBuffer; private volatile boolean subscriberCompleted; - private volatile boolean publisherCompleted; - - private volatile Throwable publisherError; - private Subscription subscription; - private Subscriber subscriber; - // Subscriber @Override public final void onSubscribe(Subscription subscription) { if (logger.isTraceEnabled()) { - logger.trace("SUB " + this.subscriberState + " onSubscribe: " + subscription); + logger.trace(this.state + " onSubscribe: " + subscription); } - this.subscriberState.get().onSubscribe(this, subscription); + this.state.get().onSubscribe(this, subscription); } @Override public final void onNext(DataBuffer dataBuffer) { if (logger.isTraceEnabled()) { - logger.trace("SUB " + this.subscriberState + " onNext: " + dataBuffer); + logger.trace(this.state + " onNext: " + dataBuffer); } - this.subscriberState.get().onNext(this, dataBuffer); + this.state.get().onNext(this, dataBuffer); } @Override public final void onError(Throwable t) { if (logger.isErrorEnabled()) { - logger.error("SUB " + this.subscriberState + " publishError: " + t, t); + logger.error(this.state + " onError: " + t, t); } - this.subscriberState.get().onError(this, t); + this.state.get().onError(this, t); } @Override public final void onComplete() { if (logger.isTraceEnabled()) { - logger.trace("SUB " + this.subscriberState + " onComplete"); + logger.trace(this.state + " onComplete"); } - this.subscriberState.get().onComplete(this); + this.state.get().onComplete(this); } // Publisher @Override public final void subscribe(Subscriber subscriber) { - if (logger.isTraceEnabled()) { - logger.trace("PUB " + this.publisherState + " subscribe: " + subscriber); - } - this.publisherState.get().subscribe(this, subscriber); - } - - private void publishComplete() { - if (logger.isTraceEnabled()) { - logger.trace("PUB " + this.publisherState + " publishComplete"); - } - this.publisherState.get().publishComplete(this); - } - - private void publishError(Throwable t) { - if (logger.isTraceEnabled()) { - logger.trace("PUB " + this.publisherState + " publishError: " + t); - } - this.publisherState.get().publishError(this, t); + this.publisherDelegate.subscribe(subscriber); } // listener methods @@ -130,7 +106,7 @@ abstract class AbstractResponseBodyProcessor implements Processor * Refer to the individual states for more information. */ - private enum SubscriberState { + private enum State { /** * The initial unsubscribed state. Will respond to {@code onSubscribe} by @@ -253,7 +195,7 @@ abstract class AbstractResponseBodyProcessor implements Processor subscriber) { - Objects.requireNonNull(subscriber); - if (processor.changePublisherState(this, SUBSCRIBED)) { - Subscription subscription = new ResponseBodySubscription(processor); - processor.subscriber = subscriber; - subscriber.onSubscribe(subscription); - if (processor.publisherCompleted) { - processor.publishComplete(); - } - else if (processor.publisherError != null) { - processor.publishError(processor.publisherError); - } - } - else { - throw new IllegalStateException(toString()); - } - } - - @Override - void publishComplete(AbstractResponseBodyProcessor processor) { - processor.publisherCompleted = true; - } - - @Override - void publishError(AbstractResponseBodyProcessor processor, Throwable t) { - processor.publisherError = t; - } - }, - SUBSCRIBED { - @Override - void request(AbstractResponseBodyProcessor processor, long n) { - BackpressureUtils.checkRequest(n, processor.subscriber); - } - - @Override - void publishComplete(AbstractResponseBodyProcessor processor) { - if (processor.changePublisherState(this, COMPLETED)) { - processor.subscriber.onComplete(); - } - } - - @Override - void publishError(AbstractResponseBodyProcessor processor, Throwable t) { - if (processor.changePublisherState(this, COMPLETED)) { - processor.subscriber.onError(t); - } - } - - }, - COMPLETED { - @Override - void request(AbstractResponseBodyProcessor processor, long n) { - // ignore - } - - @Override - void cancel(AbstractResponseBodyProcessor processor) { - // ignore - } - - @Override - void publishComplete(AbstractResponseBodyProcessor processor) { - // ignore - } - - @Override - void publishError(AbstractResponseBodyProcessor processor, Throwable t) { - // ignore - } - }; - - void subscribe(AbstractResponseBodyProcessor processor, - Subscriber subscriber) { - throw new IllegalStateException(toString()); - } - - void request(AbstractResponseBodyProcessor processor, long n) { - throw new IllegalStateException(toString()); - } - - void cancel(AbstractResponseBodyProcessor processor) { - processor.changePublisherState(this, COMPLETED); - } - - void publishComplete(AbstractResponseBodyProcessor processor) { - throw new IllegalStateException(toString()); - } - - void publishError(AbstractResponseBodyProcessor processor, Throwable t) { - throw new IllegalStateException(toString()); - } } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ResponseBodyWriteResultPublisher.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ResponseBodyWriteResultPublisher.java new file mode 100644 index 00000000000..eeaab20ed09 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ResponseBodyWriteResultPublisher.java @@ -0,0 +1,211 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import reactor.core.util.BackpressureUtils; + +/** + * Publisher returned from {@link ServerHttpResponse#writeWith(Publisher)}. + * @author Arjen Poutsma + */ +class ResponseBodyWriteResultPublisher implements Publisher { + + private static final Log logger = + LogFactory.getLog(ResponseBodyWriteResultPublisher.class); + + private final AtomicReference state = + new AtomicReference<>(State.UNSUBSCRIBED); + + private Subscriber subscriber; + + private volatile boolean publisherCompleted; + + private volatile Throwable publisherError; + + @Override + public final void subscribe(Subscriber subscriber) { + if (logger.isTraceEnabled()) { + logger.trace(this.state + " subscribe: " + subscriber); + } + this.state.get().subscribe(this, subscriber); + } + + private boolean changeState(State oldState, State newState) { + return this.state.compareAndSet(oldState, newState); + } + + /** + * Publishes the complete signal to the subscriber of this publisher. + */ + public void publishComplete() { + if (logger.isTraceEnabled()) { + logger.trace(this.state + " publishComplete"); + } + this.state.get().publishComplete(this); + } + + /** + * Publishes the given error signal to the subscriber of this publisher. + */ + public void publishError(Throwable t) { + if (logger.isTraceEnabled()) { + logger.trace(this.state + " publishError: " + t); + } + this.state.get().publishError(this, t); + } + + private static final class ResponseBodyWriteResultSubscription + implements Subscription { + + private final ResponseBodyWriteResultPublisher publisher; + + public ResponseBodyWriteResultSubscription( + ResponseBodyWriteResultPublisher publisher) { + this.publisher = publisher; + } + + @Override + public final void request(long n) { + if (logger.isTraceEnabled()) { + logger.trace(state() + " request: " + n); + } + state().request(this.publisher, n); + } + + @Override + public final void cancel() { + if (logger.isTraceEnabled()) { + logger.trace(state() + " cancel"); + } + state().cancel(this.publisher); + } + + private State state() { + return this.publisher.state.get(); + } + + } + + private enum State { + UNSUBSCRIBED { + @Override + void subscribe(ResponseBodyWriteResultPublisher publisher, + Subscriber subscriber) { + Objects.requireNonNull(subscriber); + if (publisher.changeState(this, SUBSCRIBED)) { + Subscription subscription = + new ResponseBodyWriteResultSubscription(publisher); + publisher.subscriber = subscriber; + subscriber.onSubscribe(subscription); + if (publisher.publisherCompleted) { + publisher.publishComplete(); + } + else if (publisher.publisherError != null) { + publisher.publishError(publisher.publisherError); + } + } + else { + throw new IllegalStateException(toString()); + } + } + + @Override + void publishComplete(ResponseBodyWriteResultPublisher publisher) { + publisher.publisherCompleted = true; + } + + @Override + void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) { + publisher.publisherError = t; + } + }, + SUBSCRIBED { + @Override + void request(ResponseBodyWriteResultPublisher publisher, long n) { + BackpressureUtils.checkRequest(n, publisher.subscriber); + } + + @Override + void publishComplete(ResponseBodyWriteResultPublisher publisher) { + if (publisher.changeState(this, COMPLETED)) { + publisher.subscriber.onComplete(); + } + } + + @Override + void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) { + if (publisher.changeState(this, COMPLETED)) { + publisher.subscriber.onError(t); + } + } + + }, + COMPLETED { + @Override + void request(ResponseBodyWriteResultPublisher publisher, long n) { + // ignore + } + + @Override + void cancel(ResponseBodyWriteResultPublisher publisher) { + // ignore + } + + @Override + void publishComplete(ResponseBodyWriteResultPublisher publisher) { + // ignore + } + + @Override + void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) { + // ignore + } + }; + + void subscribe(ResponseBodyWriteResultPublisher publisher, + Subscriber subscriber) { + throw new IllegalStateException(toString()); + } + + void request(ResponseBodyWriteResultPublisher publisher, long n) { + throw new IllegalStateException(toString()); + } + + void cancel(ResponseBodyWriteResultPublisher publisher) { + publisher.changeState(this, COMPLETED); + } + + void publishComplete(ResponseBodyWriteResultPublisher publisher) { + throw new IllegalStateException(toString()); + } + + void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) { + throw new IllegalStateException(toString()); + } + + } + + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java index c8b03c4c50e..e95f39afe30 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java @@ -248,14 +248,13 @@ public class ServletHttpHandlerAdapter extends HttpServlet { } @Override - protected void checkOnWritePossible() { + protected boolean isWritePossible() { try { - if (outputStream().isReady()) { - onWritePossible(); - } + return outputStream().isReady(); } catch (IOException ex) { onError(ex); + return false; } } @@ -307,11 +306,6 @@ public class ServletHttpHandlerAdapter extends HttpServlet { } - @Override - protected void close() { - this.synchronizer.writeComplete(); - } - private int writeDataBuffer(DataBuffer dataBuffer) throws IOException { InputStream input = dataBuffer.asInputStream(); ServletOutputStream output = outputStream(); diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index 0910723ea89..52f77cbcd34 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -30,8 +30,6 @@ import io.undertow.server.handlers.CookieImpl; import io.undertow.util.HttpString; import org.reactivestreams.Publisher; import org.xnio.ChannelListener; -import org.xnio.ChannelListeners; -import org.xnio.IoUtils; import org.xnio.channels.StreamSinkChannel; import reactor.core.publisher.Mono; @@ -206,23 +204,6 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse this.byteBuffer = null; } - @Override - protected void close() { - try { - this.responseChannel.shutdownWrites(); - - if (!this.responseChannel.flush()) { - this.responseChannel.getWriteSetter().set(ChannelListeners - .flushingChannelListener( - o -> IoUtils.safeClose(this.responseChannel), - ChannelListeners.closingChannelExceptionHandler())); - this.responseChannel.resumeWrites(); - } - } - catch (IOException ignored) { - } - } - private class WriteListener implements ChannelListener { @Override