Generic AbstractRequest/ResponseBodyProcessor

In preparation for use with WebSockets.

Issue: SPR-14527
This commit is contained in:
Violeta Georgieva 2016-12-08 18:39:45 +02:00 committed by Rossen Stoyanchev
parent e49813f2c4
commit 41ece612cf
6 changed files with 111 additions and 94 deletions

View File

@ -30,8 +30,6 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Operators;
import org.springframework.core.io.buffer.DataBuffer;
/**
* Abstract base class for {@code Publisher} implementations that bridge between
* event-listener APIs and Reactive Streams. Specifically, base class for the
@ -42,7 +40,7 @@ import org.springframework.core.io.buffer.DataBuffer;
* @see ServletServerHttpRequest
* @see UndertowHttpHandlerAdapter
*/
abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
public abstract class AbstractRequestBodyPublisher<T> implements Publisher<T> {
protected final Log logger = LogFactory.getLog(getClass());
@ -50,11 +48,11 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
private final AtomicLong demand = new AtomicLong();
private Subscriber<? super DataBuffer> subscriber;
private Subscriber<? super T> subscriber;
@Override
public void subscribe(Subscriber<? super DataBuffer> subscriber) {
public void subscribe(Subscriber<? super T> subscriber) {
if (this.logger.isTraceEnabled()) {
this.logger.trace(this.state + " subscribe: " + subscriber);
}
@ -66,7 +64,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
* @see ReadListener#onDataAvailable()
* @see org.xnio.ChannelListener#handleEvent(Channel)
*/
protected final void onDataAvailable() {
public final void onDataAvailable() {
if (this.logger.isTraceEnabled()) {
this.logger.trace(this.state + " onDataAvailable");
}
@ -78,7 +76,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
* @see ReadListener#onAllDataRead()
* @see org.xnio.ChannelListener#handleEvent(Channel)
*/
protected final void onAllDataRead() {
public final void onAllDataRead() {
if (this.logger.isTraceEnabled()) {
this.logger.trace(this.state + " onAllDataRead");
}
@ -86,11 +84,11 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
}
/**
* Called by a listener interface to indicate that as error has occured.
* Called by a listener interface to indicate that as error has occurred.
* @param t the error
* @see ReadListener#onError(Throwable)
*/
protected final void onError(Throwable t) {
public final void onError(Throwable t) {
if (this.logger.isErrorEnabled()) {
this.logger.error(this.state + " onError: " + t, t);
}
@ -98,16 +96,16 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
}
/**
* Reads and publishes data buffers from the input. Continues till either there is no
* Reads and publishes data from the input. Continues till either there is no
* more demand, or till there is no more data to be read.
* @return {@code true} if there is more demand; {@code false} otherwise
*/
private boolean readAndPublish() throws IOException {
while (hasDemand()) {
DataBuffer dataBuffer = read();
if (dataBuffer != null) {
T data = read();
if (data != null) {
getAndSub(this.demand, 1L);
this.subscriber.onNext(dataBuffer);
this.subscriber.onNext(data);
}
else {
return true;
@ -142,11 +140,11 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
protected abstract void checkOnDataAvailable();
/**
* Reads a data buffer from the input, if possible. Returns {@code null} if a buffer
* Reads a data from the input, if possible. Returns {@code null} if a data
* could not be read.
* @return the data buffer that was read; or {@code null}
* @return the data that was read; or {@code null}
*/
protected abstract DataBuffer read() throws IOException;
protected abstract T read() throws IOException;
private boolean hasDemand() {
return (this.demand.get() > 0);
@ -159,9 +157,9 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
private static final class RequestBodySubscription implements Subscription {
private final AbstractRequestBodyPublisher publisher;
private final AbstractRequestBodyPublisher<?> publisher;
public RequestBodySubscription(AbstractRequestBodyPublisher publisher) {
public RequestBodySubscription(AbstractRequestBodyPublisher<?> publisher) {
this.publisher = publisher;
}
@ -214,7 +212,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
*/
UNSUBSCRIBED {
@Override
void subscribe(AbstractRequestBodyPublisher publisher, Subscriber<? super DataBuffer> subscriber) {
<T> void subscribe(AbstractRequestBodyPublisher<T> publisher, Subscriber<? super T> subscriber) {
Objects.requireNonNull(subscriber);
if (publisher.changeState(this, NO_DEMAND)) {
Subscription subscription = new RequestBodySubscription(publisher);
@ -235,7 +233,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
*/
NO_DEMAND {
@Override
void request(AbstractRequestBodyPublisher publisher, long n) {
<T> void request(AbstractRequestBodyPublisher<T> publisher, long n) {
if (Operators.checkRequest(n, publisher.subscriber)) {
Operators.addAndGet(publisher.demand, n);
if (publisher.changeState(this, DEMAND)) {
@ -253,14 +251,14 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
*/
DEMAND {
@Override
void request(AbstractRequestBodyPublisher publisher, long n) {
<T> void request(AbstractRequestBodyPublisher<T> publisher, long n) {
if (Operators.checkRequest(n, publisher.subscriber)) {
Operators.addAndGet(publisher.demand, n);
}
}
@Override
void onDataAvailable(AbstractRequestBodyPublisher publisher) {
<T> void onDataAvailable(AbstractRequestBodyPublisher<T> publisher) {
if (publisher.changeState(this, READING)) {
try {
boolean demandAvailable = publisher.readAndPublish();
@ -281,7 +279,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
READING {
@Override
void request(AbstractRequestBodyPublisher publisher, long n) {
<T> void request(AbstractRequestBodyPublisher<T> publisher, long n) {
if (Operators.checkRequest(n, publisher.subscriber)) {
Operators.addAndGet(publisher.demand, n);
}
@ -293,40 +291,40 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
*/
COMPLETED {
@Override
void request(AbstractRequestBodyPublisher publisher, long n) {
<T> void request(AbstractRequestBodyPublisher<T> publisher, long n) {
// ignore
}
@Override
void cancel(AbstractRequestBodyPublisher publisher) {
<T> void cancel(AbstractRequestBodyPublisher<T> publisher) {
// ignore
}
@Override
void onAllDataRead(AbstractRequestBodyPublisher publisher) {
<T> void onAllDataRead(AbstractRequestBodyPublisher<T> publisher) {
// ignore
}
@Override
void onError(AbstractRequestBodyPublisher publisher, Throwable t) {
<T> void onError(AbstractRequestBodyPublisher<T> publisher, Throwable t) {
// ignore
}
};
void subscribe(AbstractRequestBodyPublisher publisher, Subscriber<? super DataBuffer> subscriber) {
<T> void subscribe(AbstractRequestBodyPublisher<T> publisher, Subscriber<? super T> subscriber) {
throw new IllegalStateException(toString());
}
void request(AbstractRequestBodyPublisher publisher, long n) {
<T> void request(AbstractRequestBodyPublisher<T> publisher, long n) {
throw new IllegalStateException(toString());
}
void cancel(AbstractRequestBodyPublisher publisher) {
<T> void cancel(AbstractRequestBodyPublisher<T> publisher) {
publisher.changeState(this, COMPLETED);
}
void onDataAvailable(AbstractRequestBodyPublisher publisher) {
<T> void onDataAvailable(AbstractRequestBodyPublisher<T> publisher) {
// ignore
}
void onAllDataRead(AbstractRequestBodyPublisher publisher) {
<T> void onAllDataRead(AbstractRequestBodyPublisher<T> publisher) {
if (publisher.changeState(this, COMPLETED)) {
if (publisher.subscriber != null) {
publisher.subscriber.onComplete();
@ -334,7 +332,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
}
}
void onError(AbstractRequestBodyPublisher publisher, Throwable t) {
<T> void onError(AbstractRequestBodyPublisher<T> publisher, Throwable t) {
if (publisher.changeState(this, COMPLETED)) {
if (publisher.subscriber != null) {
publisher.subscriber.onError(t);

View File

@ -29,8 +29,6 @@ import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.util.Assert;
/**
@ -44,7 +42,7 @@ import org.springframework.util.Assert;
* @see UndertowHttpHandlerAdapter
* @see ServerHttpResponse#writeWith(Publisher)
*/
abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Void> {
public abstract class AbstractResponseBodyProcessor<T> implements Processor<T, Void> {
protected final Log logger = LogFactory.getLog(getClass());
@ -52,7 +50,7 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
private volatile DataBuffer currentBuffer;
protected volatile T currentData;
private volatile boolean subscriberCompleted;
@ -70,11 +68,11 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
}
@Override
public final void onNext(DataBuffer dataBuffer) {
public final void onNext(T data) {
if (logger.isTraceEnabled()) {
logger.trace(this.state + " onNext: " + dataBuffer);
logger.trace(this.state + " onNext: " + data);
}
this.state.get().onNext(this, dataBuffer);
this.state.get().onNext(this, data);
}
@Override
@ -109,34 +107,29 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
* @see WriteListener#onWritePossible()
* @see org.xnio.ChannelListener#handleEvent(Channel)
*/
protected final void onWritePossible() {
public final void onWritePossible() {
this.state.get().onWritePossible(this);
}
/**
* Called when a {@link DataBuffer} is received via {@link Subscriber#onNext(Object)}
* @param dataBuffer the buffer that was received.
* Called when a data is received via {@link Subscriber#onNext(Object)}
* @param data the data that was received.
*/
protected void receiveBuffer(DataBuffer dataBuffer) {
Assert.state(this.currentBuffer == null);
this.currentBuffer = dataBuffer;
protected void receiveData(T data) {
Assert.state(this.currentData == null);
this.currentData = data;
}
/**
* Called when the current buffer should be
* {@linkplain DataBufferUtils#release(DataBuffer) released}.
* Called when the current data should be released.
*/
protected void releaseBuffer() {
if (logger.isTraceEnabled()) {
logger.trace("releaseBuffer: " + this.currentBuffer);
}
DataBufferUtils.release(this.currentBuffer);
this.currentBuffer = null;
}
protected abstract void releaseData();
protected abstract boolean isDataEmpty(T data);
/**
* Called when a {@link DataBuffer} is received via {@link Subscriber#onNext(Object)}
* or when only partial data from the {@link DataBuffer} was written.
* Called when a data is received via {@link Subscriber#onNext(Object)}
* or when only partial data was written.
*/
private void writeIfPossible() {
if (isWritePossible()) {
@ -152,15 +145,15 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
}
/**
* Writes the given data buffer to the output, indicating if the entire buffer was
* Writes the given data to the output, indicating if the entire data was
* written.
* @param dataBuffer the data buffer to write
* @return {@code true} if {@code dataBuffer} was fully written and a new buffer
* @param data the data to write
* @return {@code true} if the data was fully written and a new data
* can be requested; {@code false} otherwise
*/
protected abstract boolean write(DataBuffer dataBuffer) throws IOException;
protected abstract boolean write(T data) throws IOException;
protected void cancel() {
public void cancel() {
this.subscription.cancel();
}
@ -191,13 +184,13 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
/**
* The initial unsubscribed state. Will respond to {@code onSubscribe} by
* requesting 1 buffer from the subscription, and change state to {@link
* requesting 1 data from the subscription, and change state to {@link
* #REQUESTED}.
*/
UNSUBSCRIBED {
@Override
public void onSubscribe(AbstractResponseBodyProcessor processor, Subscription subscription) {
public <T> void onSubscribe(AbstractResponseBodyProcessor<T> processor, Subscription subscription) {
Objects.requireNonNull(subscription, "Subscription cannot be null");
if (processor.changeState(this, REQUESTED)) {
processor.subscription = subscription;
@ -209,7 +202,7 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
}
},
/**
* State that gets entered after a buffer has been
* State that gets entered after a data has been
* {@linkplain Subscription#request(long) requested}. Responds to {@code onNext}
* by changing state to {@link #RECEIVED}, and responds to {@code onComplete} by
* changing state to {@link #COMPLETED}.
@ -217,12 +210,12 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
REQUESTED {
@Override
public void onNext(AbstractResponseBodyProcessor processor, DataBuffer dataBuffer) {
if (dataBuffer.readableByteCount() == 0) {
public <T> void onNext(AbstractResponseBodyProcessor<T> processor, T data) {
if (processor.isDataEmpty(data)) {
processor.subscription.request(1);
}
else {
processor.receiveBuffer(dataBuffer);
processor.receiveData(data);
if (processor.changeState(this, RECEIVED)) {
processor.writeIfPossible();
}
@ -230,16 +223,16 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
}
@Override
public void onComplete(AbstractResponseBodyProcessor processor) {
public <T> void onComplete(AbstractResponseBodyProcessor<T> processor) {
if (processor.changeState(this, COMPLETED)) {
processor.resultPublisher.publishComplete();
}
}
},
/**
* State that gets entered after a buffer has been
* State that gets entered after a data has been
* {@linkplain Subscriber#onNext(Object) received}. Responds to
* {@code onWritePossible} by writing the current buffer and changes
* {@code onWritePossible} by writing the current data and changes
* the state to {@link #WRITING}. If it can be written completely,
* changes the state to either {@link #REQUESTED} if the subscription
* has not been completed; or {@link #COMPLETED} if it has. If it cannot
@ -248,13 +241,13 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
RECEIVED {
@Override
public void onWritePossible(AbstractResponseBodyProcessor processor) {
public <T> void onWritePossible(AbstractResponseBodyProcessor<T> processor) {
if (processor.changeState(this, WRITING)) {
DataBuffer dataBuffer = processor.currentBuffer;
T data = processor.currentData;
try {
boolean writeCompleted = processor.write(dataBuffer);
boolean writeCompleted = processor.write(data);
if (writeCompleted) {
processor.releaseBuffer();
processor.releaseData();
if (!processor.subscriberCompleted) {
processor.changeState(WRITING, REQUESTED);
processor.subscription.request(1);
@ -277,18 +270,18 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
}
@Override
public void onComplete(AbstractResponseBodyProcessor processor) {
public <T> void onComplete(AbstractResponseBodyProcessor<T> processor) {
processor.subscriberCompleted = true;
}
},
/**
* State that gets entered after a writing of the current buffer has been
* State that gets entered after a writing of the current data has been
* {@code onWritePossible started}.
*/
WRITING {
@Override
public void onComplete(AbstractResponseBodyProcessor processor) {
public <T> void onComplete(AbstractResponseBodyProcessor<T> processor) {
processor.subscriberCompleted = true;
}
},
@ -298,40 +291,40 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
COMPLETED {
@Override
public void onNext(AbstractResponseBodyProcessor processor, DataBuffer dataBuffer) {
public <T> void onNext(AbstractResponseBodyProcessor<T> processor, T data) {
// ignore
}
@Override
public void onError(AbstractResponseBodyProcessor processor, Throwable ex) {
public <T> void onError(AbstractResponseBodyProcessor<T> processor, Throwable ex) {
// ignore
}
@Override
public void onComplete(AbstractResponseBodyProcessor processor) {
public <T> void onComplete(AbstractResponseBodyProcessor<T> processor) {
// ignore
}
};
public void onSubscribe(AbstractResponseBodyProcessor processor, Subscription subscription) {
public <T> void onSubscribe(AbstractResponseBodyProcessor<T> processor, Subscription subscription) {
subscription.cancel();
}
public void onNext(AbstractResponseBodyProcessor processor, DataBuffer dataBuffer) {
public <T> void onNext(AbstractResponseBodyProcessor<T> processor, T data) {
throw new IllegalStateException(toString());
}
public void onError(AbstractResponseBodyProcessor processor, Throwable ex) {
public <T> void onError(AbstractResponseBodyProcessor<T> processor, Throwable ex) {
if (processor.changeState(this, COMPLETED)) {
processor.resultPublisher.publishError(ex);
}
}
public void onComplete(AbstractResponseBodyProcessor processor) {
public <T> void onComplete(AbstractResponseBodyProcessor<T> processor) {
throw new IllegalStateException(toString());
}
public void onWritePossible(AbstractResponseBodyProcessor processor) {
public <T> void onWritePossible(AbstractResponseBodyProcessor<T> processor) {
// ignore
}
}

View File

@ -203,7 +203,7 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest {
}
private static class RequestBodyPublisher extends AbstractRequestBodyPublisher {
private static class RequestBodyPublisher extends AbstractRequestBodyPublisher<DataBuffer> {
private final RequestBodyPublisher.RequestBodyReadListener readListener =
new RequestBodyPublisher.RequestBodyReadListener();

View File

@ -32,6 +32,7 @@ import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseCookie;
@ -183,7 +184,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
}
private class ResponseBodyProcessor extends AbstractResponseBodyProcessor {
private class ResponseBodyProcessor extends AbstractResponseBodyProcessor<DataBuffer> {
private final ServletOutputStream outputStream;
@ -199,6 +200,20 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
return this.outputStream.isReady();
}
@Override
protected void releaseData() {
if (logger.isTraceEnabled()) {
logger.trace("releaseBuffer: " + this.currentData);
}
DataBufferUtils.release(this.currentData);
this.currentData = null;
}
@Override
protected boolean isDataEmpty(DataBuffer dataBuffer) {
return dataBuffer.readableByteCount() == 0;
}
@Override
protected boolean write(DataBuffer dataBuffer) throws IOException {
if (ServletServerHttpResponse.this.flushOnNext) {

View File

@ -106,7 +106,7 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest {
return Flux.from(this.body);
}
private static class RequestBodyPublisher extends AbstractRequestBodyPublisher {
private static class RequestBodyPublisher extends AbstractRequestBodyPublisher<DataBuffer> {
private final ChannelListener<StreamSourceChannel> readListener =
new ReadListener();

View File

@ -36,6 +36,7 @@ import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
import org.springframework.http.ZeroCopyHttpOutputMessage;
@ -138,7 +139,7 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon
}
private static class ResponseBodyProcessor extends AbstractResponseBodyProcessor {
private static class ResponseBodyProcessor extends AbstractResponseBodyProcessor<DataBuffer> {
private final ChannelListener<StreamSinkChannel> listener = new WriteListener();
@ -187,17 +188,27 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon
}
@Override
protected void receiveBuffer(DataBuffer dataBuffer) {
super.receiveBuffer(dataBuffer);
protected void receiveData(DataBuffer dataBuffer) {
super.receiveData(dataBuffer);
this.byteBuffer = dataBuffer.asByteBuffer();
}
@Override
protected void releaseBuffer() {
super.releaseBuffer();
protected void releaseData() {
if (logger.isTraceEnabled()) {
logger.trace("releaseBuffer: " + this.currentData);
}
DataBufferUtils.release(this.currentData);
this.currentData = null;
this.byteBuffer = null;
}
@Override
protected boolean isDataEmpty(DataBuffer dataBuffer) {
return dataBuffer.readableByteCount() == 0;
}
private class WriteListener implements ChannelListener<StreamSinkChannel> {
@Override