Refactor ResponseBodySubscriber to Processor

This commit changes the AbstractResponseBodySubscriber into a
AbstractResponseBodyProcessor<DataBuffer, Void>, so that the processor
can be used as a return value for writeWith.

Additional, this commit no longer closes the response after an eror
occurred.

This fixes #59.
This commit is contained in:
Arjen Poutsma 2016-07-05 11:20:18 +02:00
parent c85d1dc126
commit b0de99bc8c
8 changed files with 638 additions and 425 deletions

View File

@ -52,6 +52,8 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
private Subscriber<? super DataBuffer> subscriber;
private volatile boolean dataAvailable;
@Override
public void subscribe(Subscriber<? super DataBuffer> subscriber) {
if (this.logger.isTraceEnabled()) {
@ -199,7 +201,9 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
void subscribe(AbstractRequestBodyPublisher publisher,
Subscriber<? super DataBuffer> subscriber) {
Objects.requireNonNull(subscriber);
if (publisher.changeState(this, DATA_UNAVAILABLE)) {
State newState =
publisher.dataAvailable ? DATA_AVAILABLE : DATA_UNAVAILABLE;
if (publisher.changeState(this, newState)) {
Subscription subscription = new RequestBodySubscription(
publisher);
publisher.subscriber = subscriber;
@ -209,6 +213,11 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
throw new IllegalStateException(toString());
}
}
@Override
void onDataAvailable(AbstractRequestBodyPublisher publisher) {
publisher.dataAvailable = true;
}
},
/**
* State that gets entered when there is no data to be read. Responds to {@link
@ -252,20 +261,11 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
}
}
@Override
void onDataAvailable(AbstractRequestBodyPublisher publisher) {
// ignore
}
},
/**
* The terminal completed state. Does not respond to any events.
*/
COMPLETED {
@Override
void subscribe(AbstractRequestBodyPublisher publisher,
Subscriber<? super DataBuffer> subscriber) {
// ignore
}
@Override
void request(AbstractRequestBodyPublisher publisher, long n) {
@ -277,11 +277,6 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
// ignore
}
@Override
void onDataAvailable(AbstractRequestBodyPublisher publisher) {
// ignore
}
@Override
void onAllDataRead(AbstractRequestBodyPublisher publisher) {
// ignore
@ -309,7 +304,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
}
void onDataAvailable(AbstractRequestBodyPublisher publisher) {
throw new IllegalStateException(toString());
// ignore
}
void onAllDataRead(AbstractRequestBodyPublisher publisher) {

View File

@ -0,0 +1,486 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.io.IOException;
import java.nio.channels.Channel;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.WriteListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.util.BackpressureUtils;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.FlushingDataBuffer;
import org.springframework.core.io.buffer.support.DataBufferUtils;
import org.springframework.util.Assert;
/**
* Abstract base class for {@code Subscriber} implementations that bridge between
* event-listener APIs and Reactive Streams. Specifically, base class for the Servlet 3.1
* and Undertow support.
* @author Arjen Poutsma
* @see ServletServerHttpRequest
* @see UndertowHttpHandlerAdapter
*/
abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Void> {
protected final Log logger = LogFactory.getLog(getClass());
private final AtomicReference<SubscriberState> subscriberState =
new AtomicReference<>(SubscriberState.UNSUBSCRIBED);
private final AtomicReference<PublisherState> publisherState =
new AtomicReference<>(PublisherState.UNSUBSCRIBED);
private volatile DataBuffer currentBuffer;
private volatile boolean subscriberCompleted;
private volatile boolean publisherCompleted;
private volatile Throwable publisherError;
private Subscription subscription;
private Subscriber<? super Void> subscriber;
// Subscriber
@Override
public final void onSubscribe(Subscription subscription) {
if (logger.isTraceEnabled()) {
logger.trace("SUB " + this.subscriberState + " onSubscribe: " + subscription);
}
this.subscriberState.get().onSubscribe(this, subscription);
}
@Override
public final void onNext(DataBuffer dataBuffer) {
if (logger.isTraceEnabled()) {
logger.trace("SUB " + this.subscriberState + " onNext: " + dataBuffer);
}
this.subscriberState.get().onNext(this, dataBuffer);
}
@Override
public final void onError(Throwable t) {
if (logger.isErrorEnabled()) {
logger.error("SUB " + this.subscriberState + " publishError: " + t, t);
}
this.subscriberState.get().onError(this, t);
}
@Override
public final void onComplete() {
if (logger.isTraceEnabled()) {
logger.trace("SUB " + this.subscriberState + " onComplete");
}
this.subscriberState.get().onComplete(this);
}
// Publisher
@Override
public final void subscribe(Subscriber<? super Void> subscriber) {
if (logger.isTraceEnabled()) {
logger.trace("PUB " + this.publisherState + " subscribe: " + subscriber);
}
this.publisherState.get().subscribe(this, subscriber);
}
private void publishComplete() {
if (logger.isTraceEnabled()) {
logger.trace("PUB " + this.publisherState + " publishComplete");
}
this.publisherState.get().publishComplete(this);
}
private void publishError(Throwable t) {
if (logger.isTraceEnabled()) {
logger.trace("PUB " + this.publisherState + " publishError: " + t);
}
this.publisherState.get().publishError(this, t);
}
// listener methods
/**
* Called via a listener interface to indicate that writing is possible.
* @see WriteListener#onWritePossible()
* @see org.xnio.ChannelListener#handleEvent(Channel)
*/
protected final void onWritePossible() {
this.subscriberState.get().onWritePossible(this);
}
/**
* Called when a {@link DataBuffer} is received via {@link Subscriber#onNext(Object)}
* @param dataBuffer the buffer that was received.
*/
protected void receiveBuffer(DataBuffer dataBuffer) {
Assert.state(this.currentBuffer == null);
this.currentBuffer = dataBuffer;
checkOnWritePossible();
}
/**
* Called when a {@link DataBuffer} is received via {@link Subscriber#onNext(Object)}
* or when only partial data from the {@link DataBuffer} was written.
*/
protected void checkOnWritePossible() {
// no-op
}
/**
* Called when the current buffer should be
* {@linkplain DataBufferUtils#release(DataBuffer) released}.
*/
protected void releaseBuffer() {
if (logger.isTraceEnabled()) {
logger.trace("releaseBuffer: " + this.currentBuffer);
}
DataBufferUtils.release(this.currentBuffer);
this.currentBuffer = null;
}
/**
* Writes the given data buffer to the output, indicating if the entire buffer was
* written.
* @param dataBuffer the data buffer to write
* @return {@code true} if {@code dataBuffer} was fully written and a new buffer
* can be requested; {@code false} otherwise
*/
protected abstract boolean write(DataBuffer dataBuffer) throws IOException;
/**
* Flushes the output.
*/
protected abstract void flush() throws IOException;
/**
* Closes the output.
*/
protected abstract void close();
private boolean changeSubscriberState(SubscriberState oldState,
SubscriberState newState) {
return this.subscriberState.compareAndSet(oldState, newState);
}
private boolean changePublisherState(PublisherState oldState,
PublisherState newState) {
return this.publisherState.compareAndSet(oldState, newState);
}
private static final class ResponseBodySubscription implements Subscription {
private final AbstractResponseBodyProcessor processor;
public ResponseBodySubscription(AbstractResponseBodyProcessor processor) {
this.processor = processor;
}
@Override
public final void request(long n) {
if (this.processor.logger.isTraceEnabled()) {
this.processor.logger.trace("PUB " + state() + " request: " + n);
}
state().request(this.processor, n);
}
@Override
public final void cancel() {
if (this.processor.logger.isTraceEnabled()) {
this.processor.logger.trace("PUB " + state() + " cancel");
}
state().cancel(this.processor);
}
private PublisherState state() {
return this.processor.publisherState.get();
}
}
/**
* Represents a state for the {@link Subscriber} to be in. The following figure
* indicate the four different states that exist, and the relationships between them.
*
* <pre>
* UNSUBSCRIBED
* |
* v
* REQUESTED -------------------> RECEIVED
* ^ ^
* | |
* --------- WRITING <-----
* |
* v
* COMPLETED
* </pre>
* Refer to the individual states for more information.
*/
private enum SubscriberState {
/**
* The initial unsubscribed state. Will respond to {@code onSubscribe} by
* requesting 1 buffer from the subscription, and change state to {@link
* #REQUESTED}.
*/
UNSUBSCRIBED {
@Override
void onSubscribe(AbstractResponseBodyProcessor processor,
Subscription subscription) {
Objects.requireNonNull(subscription, "Subscription cannot be null");
if (processor.changeSubscriberState(this, REQUESTED)) {
processor.subscription = subscription;
subscription.request(1);
}
else {
super.onSubscribe(processor, subscription);
}
}
},
/**
* State that gets entered after a buffer has been
* {@linkplain Subscription#request(long) requested}. Responds to {@code onNext}
* by changing state to {@link #RECEIVED}, and responds to {@code onComplete} by
* changing state to {@link #COMPLETED}.
*/
REQUESTED {
@Override
void onNext(AbstractResponseBodyProcessor processor, DataBuffer dataBuffer) {
if (processor.changeSubscriberState(this, RECEIVED)) {
processor.receiveBuffer(dataBuffer);
}
}
@Override
void onComplete(AbstractResponseBodyProcessor processor) {
if (processor.changeSubscriberState(this, COMPLETED)) {
processor.subscriberCompleted = true;
processor.close();
processor.publishComplete();
}
}
},
/**
* State that gets entered after a buffer has been
* {@linkplain Subscriber#onNext(Object) received}. Responds to
* {@code onWritePossible} by writing the current buffer and changes
* the state to {@link #WRITING}. If it can be written completely,
* changes the state to either {@link #REQUESTED} if the subscription
* has not been completed; or {@link #COMPLETED} if it has. If it cannot
* be written completely the state will be changed to {@link #RECEIVED}.
*/
RECEIVED {
@Override
void onWritePossible(AbstractResponseBodyProcessor processor) {
if (processor.changeSubscriberState(this, WRITING)) {
DataBuffer dataBuffer = processor.currentBuffer;
try {
boolean writeCompleted = processor.write(dataBuffer);
if (writeCompleted) {
if (dataBuffer instanceof FlushingDataBuffer) {
processor.flush();
}
processor.releaseBuffer();
if (!processor.subscriberCompleted) {
processor.changeSubscriberState(WRITING, REQUESTED);
processor.subscription.request(1);
}
else {
processor.changeSubscriberState(WRITING, COMPLETED);
processor.close();
processor.publishComplete();
}
}
else {
processor.changeSubscriberState(WRITING, RECEIVED);
processor.checkOnWritePossible();
}
}
catch (IOException ex) {
processor.onError(ex);
}
}
}
@Override
void onComplete(AbstractResponseBodyProcessor processor) {
processor.subscriberCompleted = true;
}
},
/**
* State that gets entered after a writing of the current buffer has been
* {@code onWritePossible started}.
*/
WRITING {
@Override
void onComplete(AbstractResponseBodyProcessor processor) {
processor.subscriberCompleted = true;
}
},
/**
* The terminal completed state. Does not respond to any events.
*/
COMPLETED {
@Override
void onNext(AbstractResponseBodyProcessor processor, DataBuffer dataBuffer) {
// ignore
}
@Override
void onError(AbstractResponseBodyProcessor processor, Throwable t) {
// ignore
}
@Override
void onComplete(AbstractResponseBodyProcessor processor) {
// ignore
}
};
void onSubscribe(AbstractResponseBodyProcessor processor, Subscription s) {
s.cancel();
}
void onNext(AbstractResponseBodyProcessor processor, DataBuffer dataBuffer) {
throw new IllegalStateException(toString());
}
void onError(AbstractResponseBodyProcessor processor, Throwable t) {
if (processor.changeSubscriberState(this, COMPLETED)) {
processor.publishError(t);
}
}
void onComplete(AbstractResponseBodyProcessor processor) {
throw new IllegalStateException(toString());
}
void onWritePossible(AbstractResponseBodyProcessor processor) {
// ignore
}
}
private enum PublisherState {
UNSUBSCRIBED {
@Override
void subscribe(AbstractResponseBodyProcessor processor,
Subscriber<? super Void> subscriber) {
Objects.requireNonNull(subscriber);
if (processor.changePublisherState(this, SUBSCRIBED)) {
Subscription subscription = new ResponseBodySubscription(processor);
processor.subscriber = subscriber;
subscriber.onSubscribe(subscription);
if (processor.publisherCompleted) {
processor.publishComplete();
}
else if (processor.publisherError != null) {
processor.publishError(processor.publisherError);
}
}
else {
throw new IllegalStateException(toString());
}
}
@Override
void publishComplete(AbstractResponseBodyProcessor processor) {
processor.publisherCompleted = true;
}
@Override
void publishError(AbstractResponseBodyProcessor processor, Throwable t) {
processor.publisherError = t;
}
},
SUBSCRIBED {
@Override
void request(AbstractResponseBodyProcessor processor, long n) {
BackpressureUtils.checkRequest(n, processor.subscriber);
}
@Override
void publishComplete(AbstractResponseBodyProcessor processor) {
if (processor.changePublisherState(this, COMPLETED)) {
processor.subscriber.onComplete();
}
}
@Override
void publishError(AbstractResponseBodyProcessor processor, Throwable t) {
if (processor.changePublisherState(this, COMPLETED)) {
processor.subscriber.onError(t);
}
}
},
COMPLETED {
@Override
void request(AbstractResponseBodyProcessor processor, long n) {
// ignore
}
@Override
void cancel(AbstractResponseBodyProcessor processor) {
// ignore
}
@Override
void publishComplete(AbstractResponseBodyProcessor processor) {
// ignore
}
@Override
void publishError(AbstractResponseBodyProcessor processor, Throwable t) {
// ignore
}
};
void subscribe(AbstractResponseBodyProcessor processor,
Subscriber<? super Void> subscriber) {
throw new IllegalStateException(toString());
}
void request(AbstractResponseBodyProcessor processor, long n) {
throw new IllegalStateException(toString());
}
void cancel(AbstractResponseBodyProcessor processor) {
processor.changePublisherState(this, COMPLETED);
}
void publishComplete(AbstractResponseBodyProcessor processor) {
throw new IllegalStateException(toString());
}
void publishError(AbstractResponseBodyProcessor processor, Throwable t) {
throw new IllegalStateException(toString());
}
}
}

View File

@ -1,321 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.io.IOException;
import java.nio.channels.Channel;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.WriteListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.FlushingDataBuffer;
import org.springframework.core.io.buffer.support.DataBufferUtils;
import org.springframework.util.Assert;
/**
* Abstract base class for {@code Subscriber} implementations that bridge between
* event-listener APIs and Reactive Streams. Specifically, base class for the Servlet 3.1
* and Undertow support.
* @author Arjen Poutsma
* @see ServletServerHttpRequest
* @see UndertowHttpHandlerAdapter
*/
abstract class AbstractResponseBodySubscriber implements Subscriber<DataBuffer> {
protected final Log logger = LogFactory.getLog(getClass());
private final AtomicReference<State> state =
new AtomicReference<>(State.UNSUBSCRIBED);
private volatile DataBuffer currentBuffer;
private volatile boolean subscriptionCompleted;
private Subscription subscription;
@Override
public final void onSubscribe(Subscription subscription) {
if (logger.isTraceEnabled()) {
logger.trace(this.state + " onSubscribe: " + subscription);
}
this.state.get().onSubscribe(this, subscription);
}
@Override
public final void onNext(DataBuffer dataBuffer) {
if (logger.isTraceEnabled()) {
logger.trace(this.state + " onNext: " + dataBuffer);
}
this.state.get().onNext(this, dataBuffer);
}
@Override
public final void onError(Throwable t) {
if (logger.isErrorEnabled()) {
logger.error(this.state + " onError: " + t, t);
}
this.state.get().onError(this, t);
}
@Override
public final void onComplete() {
if (logger.isTraceEnabled()) {
logger.trace(this.state + " onComplete");
}
this.state.get().onComplete(this);
}
/**
* Called via a listener interface to indicate that writing is possible.
* @see WriteListener#onWritePossible()
* @see org.xnio.ChannelListener#handleEvent(Channel)
*/
protected final void onWritePossible() {
this.state.get().onWritePossible(this);
}
/**
* Called when a {@link DataBuffer} is received via {@link Subscriber#onNext(Object)}
* @param dataBuffer the buffer that was received.
*/
protected void receiveBuffer(DataBuffer dataBuffer) {
Assert.state(this.currentBuffer == null);
this.currentBuffer = dataBuffer;
checkOnWritePossible();
}
/**
* Called when a {@link DataBuffer} is received via {@link Subscriber#onNext(Object)}
* or when only partial data from the {@link DataBuffer} was written.
*/
protected void checkOnWritePossible() {
// no-op
}
/**
* Called when the current buffer should be
* {@linkplain DataBufferUtils#release(DataBuffer) released}.
*/
protected void releaseBuffer() {
if (logger.isTraceEnabled()) {
logger.trace("releaseBuffer: " + this.currentBuffer);
}
DataBufferUtils.release(this.currentBuffer);
this.currentBuffer = null;
}
/**
* Writes the given data buffer to the output, indicating if the entire buffer was
* written.
* @param dataBuffer the data buffer to write
* @return {@code true} if {@code dataBuffer} was fully written and a new buffer
* can be requested; {@code false} otherwise
*/
protected abstract boolean write(DataBuffer dataBuffer) throws IOException;
/**
* Writes the given exception to the output.
*/
protected abstract void writeError(Throwable t);
/**
* Flushes the output.
*/
protected abstract void flush() throws IOException;
/**
* Closes the output.
*/
protected abstract void close();
private boolean changeState(State oldState, State newState) {
return this.state.compareAndSet(oldState, newState);
}
/**
* Represents a state for the {@link Subscriber} to be in. The following figure
* indicate the four different states that exist, and the relationships between them.
*
* <pre>
* UNSUBSCRIBED
* |
* v
* REQUESTED -------------------> RECEIVED
* ^ ^
* | |
* --------- WRITING <-----
* |
* v
* COMPLETED
* </pre>
* Refer to the individual states for more information.
*/
private enum State {
/**
* The initial unsubscribed state. Will respond to {@code onSubscribe} by
* requesting 1 buffer from the subscription, and change state to {@link
* #REQUESTED}.
*/
UNSUBSCRIBED {
@Override
void onSubscribe(AbstractResponseBodySubscriber subscriber,
Subscription subscription) {
Objects.requireNonNull(subscription, "Subscription cannot be null");
if (subscriber.changeState(this, REQUESTED)) {
subscriber.subscription = subscription;
subscription.request(1);
}
else {
super.onSubscribe(subscriber, subscription);
}
}
},
/**
* State that gets entered after a buffer has been
* {@linkplain Subscription#request(long) requested}. Responds to {@code onNext}
* by changing state to {@link #RECEIVED}, and responds to {@code onComplete} by
* changing state to {@link #COMPLETED}.
*/
REQUESTED {
@Override
void onNext(AbstractResponseBodySubscriber subscriber,
DataBuffer dataBuffer) {
if (subscriber.changeState(this, RECEIVED)) {
subscriber.receiveBuffer(dataBuffer);
}
}
@Override
void onComplete(AbstractResponseBodySubscriber subscriber) {
if (subscriber.changeState(this, COMPLETED)) {
subscriber.subscriptionCompleted = true;
subscriber.close();
}
}
},
/**
* State that gets entered after a buffer has been
* {@linkplain Subscriber#onNext(Object) received}. Responds to
* {@code onWritePossible} by writing the current buffer and changes
* the state to {@link #WRITING}. If it can be written completely,
* changes the state to either {@link #REQUESTED} if the subscription
* has not been completed; or {@link #COMPLETED} if it has. If it cannot
* be written completely the state will be changed to {@link #RECEIVED}.
*/
RECEIVED {
@Override
void onWritePossible(AbstractResponseBodySubscriber subscriber) {
if (subscriber.changeState(this, WRITING)) {
DataBuffer dataBuffer = subscriber.currentBuffer;
try {
boolean writeCompleted = subscriber.write(dataBuffer);
if (writeCompleted) {
if (dataBuffer instanceof FlushingDataBuffer) {
subscriber.flush();
}
subscriber.releaseBuffer();
boolean subscriptionCompleted = subscriber.subscriptionCompleted;
if (!subscriptionCompleted) {
subscriber.changeState(WRITING, REQUESTED);
subscriber.subscription.request(1);
}
else {
subscriber.changeState(WRITING, COMPLETED);
subscriber.close();
}
}
else {
subscriber.changeState(WRITING, RECEIVED);
subscriber.checkOnWritePossible();
}
}
catch (IOException ex) {
subscriber.onError(ex);
}
}
}
@Override
void onComplete(AbstractResponseBodySubscriber subscriber) {
subscriber.subscriptionCompleted = true;
}
},
/**
* State that gets entered after a writing of the current buffer has been
* {@code onWritePossible started}.
*/
WRITING {
@Override
void onComplete(AbstractResponseBodySubscriber subscriber) {
subscriber.subscriptionCompleted = true;
}
},
/**
* The terminal completed state. Does not respond to any events.
*/
COMPLETED {
@Override
void onNext(AbstractResponseBodySubscriber subscriber,
DataBuffer dataBuffer) {
// ignore
}
@Override
void onError(AbstractResponseBodySubscriber subscriber, Throwable t) {
// ignore
}
@Override
void onComplete(AbstractResponseBodySubscriber subscriber) {
// ignore
}
};
void onSubscribe(AbstractResponseBodySubscriber subscriber, Subscription s) {
s.cancel();
}
void onNext(AbstractResponseBodySubscriber subscriber, DataBuffer dataBuffer) {
throw new IllegalStateException(toString());
}
void onError(AbstractResponseBodySubscriber subscriber, Throwable t) {
if (subscriber.changeState(this, COMPLETED)) {
subscriber.writeError(t);
subscriber.close();
}
}
void onComplete(AbstractResponseBodySubscriber subscriber) {
throw new IllegalStateException(toString());
}
void onWritePossible(AbstractResponseBodySubscriber subscriber) {
// ignore
}
}
}

View File

@ -38,7 +38,6 @@ import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpStatus;
import org.springframework.util.Assert;
/**
@ -91,13 +90,15 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
ServletServerHttpRequest request =
new ServletServerHttpRequest(servletRequest, requestBody);
ResponseBodySubscriber responseBody =
new ResponseBodySubscriber(synchronizer, this.bufferSize);
ResponseBodyProcessor responseBody =
new ResponseBodyProcessor(synchronizer, this.bufferSize);
responseBody.registerListener();
ServletServerHttpResponse response =
new ServletServerHttpResponse(servletResponse, this.dataBufferFactory,
publisher -> Mono
.from(subscriber -> publisher.subscribe(responseBody)));
publisher -> Mono.from(subscriber -> {
publisher.subscribe(responseBody);
responseBody.subscribe(subscriber);
}));
HandlerResultSubscriber resultSubscriber =
new HandlerResultSubscriber(synchronizer);
@ -129,7 +130,7 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
logger.error("Error from request handling. Completing the request.", ex);
HttpServletResponse response =
(HttpServletResponse) this.synchronizer.getResponse();
response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR.value());
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
this.synchronizer.complete();
}
@ -206,8 +207,7 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
}
}
private static class ResponseBodySubscriber extends AbstractResponseBodySubscriber {
private static class ResponseBodyProcessor extends AbstractResponseBodyProcessor {
private final ResponseBodyWriteListener writeListener =
new ResponseBodyWriteListener();
@ -218,7 +218,7 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
private volatile boolean flushOnNext;
public ResponseBodySubscriber(ServletAsyncContextSynchronizer synchronizer,
public ResponseBodyProcessor(ServletAsyncContextSynchronizer synchronizer,
int bufferSize) {
this.synchronizer = synchronizer;
this.bufferSize = bufferSize;
@ -272,13 +272,6 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
}
}
@Override
protected void writeError(Throwable t) {
HttpServletResponse response =
(HttpServletResponse) this.synchronizer.getResponse();
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
}
@Override
protected void flush() throws IOException {
ServletOutputStream output = outputStream();
@ -324,14 +317,14 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
@Override
public void onWritePossible() throws IOException {
ResponseBodySubscriber.this.onWritePossible();
ResponseBodyProcessor.this.onWritePossible();
}
@Override
public void onError(Throwable ex) {
// Error on writing to the HTTP stream, so any further writes will probably
// fail. Let's log instead of calling {@link #writeError}.
ResponseBodySubscriber.this.logger
ResponseBodyProcessor.this.logger
.error("ResponseBodyWriteListener error", ex);
}
}

View File

@ -70,11 +70,8 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
@Override
public void onError(Throwable ex) {
if (exchange.isResponseStarted() || exchange.getStatusCode() > 500) {
logger.error("Error from request handling. Completing the request.",
ex);
}
else {
logger.error("Error from request handling. Completing the request.", ex);
if (!exchange.isResponseStarted() && exchange.getStatusCode() <= 500) {
exchange.setStatusCode(500);
}
exchange.endExchange();

View File

@ -79,13 +79,14 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse
@Override
protected Mono<Void> writeWithInternal(Publisher<DataBuffer> publisher) {
return Mono.from(s -> {
// lazily create Subscriber, since calling
// {@link HttpServerExchange#getResponseChannel} as done in the
// ResponseBodySubscriber constructor commits the response status and headers
ResponseBodySubscriber subscriber = new ResponseBodySubscriber(this.exchange);
subscriber.registerListener();
publisher.subscribe(subscriber);
// lazily create Subscriber, since calling
// {@link HttpServerExchange#getResponseChannel} as done in the
// ResponseBodyProcessor constructor commits the response status and headers
return Mono.from(subscriber -> {
ResponseBodyProcessor processor = new ResponseBodyProcessor(this.exchange);
processor.registerListener();
publisher.subscribe(processor);
processor.subscribe(subscriber);
});
}
@ -137,7 +138,7 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse
}
}
private static class ResponseBodySubscriber extends AbstractResponseBodySubscriber {
private static class ResponseBodyProcessor extends AbstractResponseBodyProcessor {
private final ChannelListener<StreamSinkChannel> listener = new WriteListener();
@ -147,7 +148,7 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse
private volatile ByteBuffer byteBuffer;
public ResponseBodySubscriber(HttpServerExchange exchange) {
public ResponseBodyProcessor(HttpServerExchange exchange) {
this.exchange = exchange;
this.responseChannel = exchange.getResponseChannel();
}
@ -157,14 +158,6 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse
this.responseChannel.resumeWrites();
}
@Override
protected void writeError(Throwable t) {
if (!this.exchange.isResponseStarted() &&
this.exchange.getStatusCode() < 500) {
this.exchange.setStatusCode(500);
}
}
@Override
protected void flush() throws IOException {
if (logger.isTraceEnabled()) {

View File

@ -0,0 +1,108 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.io.IOException;
import java.net.URI;
import org.junit.Test;
import reactor.core.publisher.Mono;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.http.server.reactive.boot.ReactorHttpServer;
import org.springframework.web.client.ResponseErrorHandler;
import org.springframework.web.client.RestTemplate;
import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeFalse;
/**
* @author Arjen Poutsma
*/
public class ErrorHandlerIntegrationTests extends AbstractHttpHandlerIntegrationTests {
private ErrorHandler handler = new ErrorHandler();
@Override
protected HttpHandler createHttpHandler() {
return handler;
}
@Test
public void response() throws Exception {
// TODO: fix Reactor
assumeFalse(server instanceof ReactorHttpServer);
RestTemplate restTemplate = new RestTemplate();
restTemplate.setErrorHandler(NO_OP_ERROR_HANDLER);
ResponseEntity<String> response = restTemplate
.getForEntity(new URI("http://localhost:" + port + "/response"),
String.class);
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, response.getStatusCode());
}
@Test
public void returnValue() throws Exception {
// TODO: fix Reactor
assumeFalse(server instanceof ReactorHttpServer);
RestTemplate restTemplate = new RestTemplate();
restTemplate.setErrorHandler(NO_OP_ERROR_HANDLER);
ResponseEntity<String> response = restTemplate
.getForEntity(new URI("http://localhost:" + port + "/returnValue"),
String.class);
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, response.getStatusCode());
}
private static class ErrorHandler implements HttpHandler {
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
Exception error = new UnsupportedOperationException();
String path = request.getURI().getPath();
if (path.endsWith("response")) {
return response.writeWith(Mono.error(error));
}
else if (path.endsWith("returnValue")) {
return Mono.error(error);
}
else {
return Mono.empty();
}
}
}
private static final ResponseErrorHandler NO_OP_ERROR_HANDLER =
new ResponseErrorHandler() {
@Override
public boolean hasError(ClientHttpResponse response) throws IOException {
return false;
}
@Override
public void handleError(ClientHttpResponse response) throws IOException {
}
};
}

View File

@ -21,8 +21,6 @@ import java.util.Random;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -31,12 +29,9 @@ import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.http.server.reactive.boot.ReactorHttpServer;
import org.springframework.web.client.RestTemplate;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assume.assumeFalse;
import static org.junit.Assert.*;
public class RandomHandlerIntegrationTests extends AbstractHttpHandlerIntegrationTests {
@ -60,7 +55,6 @@ public class RandomHandlerIntegrationTests extends AbstractHttpHandlerIntegratio
@Test
public void random() throws Throwable {
// TODO: fix Reactor support
assumeFalse(server instanceof ReactorHttpServer);
RestTemplate restTemplate = new RestTemplate();
@ -72,14 +66,6 @@ public class RandomHandlerIntegrationTests extends AbstractHttpHandlerIntegratio
assertEquals(RESPONSE_SIZE,
response.getHeaders().getContentLength());
assertEquals(RESPONSE_SIZE, response.getBody().length);
while (!handler.requestComplete) {
Thread.sleep(100);
}
if (handler.requestError != null) {
throw handler.requestError;
}
assertEquals(REQUEST_SIZE, handler.requestSize);
}
@ -93,45 +79,21 @@ public class RandomHandlerIntegrationTests extends AbstractHttpHandlerIntegratio
public static final int CHUNKS = 16;
private volatile boolean requestComplete;
private int requestSize;
private Throwable requestError;
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
requestError = null;
Mono<Integer> requestSizeMono = request.getBody().
reduce(0, (integer, dataBuffer) -> integer +
dataBuffer.readableByteCount()).
doAfterTerminate((size, throwable) -> {
assertNull(throwable);
assertEquals(REQUEST_SIZE, (long) size);
});
request.getBody().subscribe(new Subscriber<DataBuffer>() {
@Override
public void onSubscribe(Subscription s) {
requestComplete = false;
requestSize = 0;
requestError = null;
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(DataBuffer bytes) {
requestSize += bytes.readableByteCount();
}
@Override
public void onError(Throwable t) {
requestComplete = true;
requestError = t;
}
@Override
public void onComplete() {
requestComplete = true;
}
});
response.getHeaders().setContentLength(RESPONSE_SIZE);
return response.writeWith(multipleChunks());
return requestSizeMono.then(response.writeWith(multipleChunks()));
}
private Publisher<DataBuffer> singleChunk() {