Rename "Request/ResponseBody" publisher/processor

AbstractRequestBodyPublisher and AbstractResponseBodyProcessor are now
used for WebSocket messages too and have been renamed more generally to
AbstractListenerReadPublisher and AbstractListenerWriteProcessor.

Issue: SPR-14527
This commit is contained in:
Rossen Stoyanchev 2016-12-09 18:22:19 +02:00
parent a2053a516e
commit fe7ee5ff33
8 changed files with 58 additions and 54 deletions

View File

@ -22,8 +22,8 @@ import java.net.URI;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.springframework.http.server.reactive.AbstractRequestBodyPublisher; import org.springframework.http.server.reactive.AbstractListenerReadPublisher;
import org.springframework.http.server.reactive.AbstractResponseBodyProcessor; import org.springframework.http.server.reactive.AbstractListenerWriteProcessor;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketMessage;
@ -122,7 +122,7 @@ public abstract class AbstractListenerWebSocketSessionSupport<T> extends WebSock
} }
} }
final class WebSocketMessagePublisher extends AbstractRequestBodyPublisher<WebSocketMessage> { final class WebSocketMessagePublisher extends AbstractListenerReadPublisher<WebSocketMessage> {
private volatile WebSocketMessage webSocketMessage; private volatile WebSocketMessage webSocketMessage;
@Override @Override
@ -155,7 +155,7 @@ public abstract class AbstractListenerWebSocketSessionSupport<T> extends WebSock
} }
} }
final class WebSocketMessageProcessor extends AbstractResponseBodyProcessor<WebSocketMessage> { final class WebSocketMessageProcessor extends AbstractListenerWriteProcessor<WebSocketMessage> {
private volatile boolean isReady = true; private volatile boolean isReady = true;
@Override @Override

View File

@ -32,15 +32,17 @@ import reactor.core.publisher.Operators;
/** /**
* Abstract base class for {@code Publisher} implementations that bridge between * Abstract base class for {@code Publisher} implementations that bridge between
* event-listener APIs and Reactive Streams. Specifically, base class for the * event-listener read APIs and Reactive Streams. Specifically, a base class for
* Servlet 3.1 and Undertow support. * reading from the HTTP request body with Servlet 3.1 and Undertow as well as
* handling incoming WebSocket messages with JSR-356, Jetty, and Undertow.
* *
* @author Arjen Poutsma * @author Arjen Poutsma
* @author Violeta Georgieva
* @since 5.0 * @since 5.0
* @see ServletServerHttpRequest * @see ServletServerHttpRequest
* @see UndertowHttpHandlerAdapter * @see UndertowHttpHandlerAdapter
*/ */
public abstract class AbstractRequestBodyPublisher<T> implements Publisher<T> { public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
protected final Log logger = LogFactory.getLog(getClass()); protected final Log logger = LogFactory.getLog(getClass());
@ -155,11 +157,11 @@ public abstract class AbstractRequestBodyPublisher<T> implements Publisher<T> {
} }
private static final class RequestBodySubscription implements Subscription { private static final class ReadSubscription implements Subscription {
private final AbstractRequestBodyPublisher<?> publisher; private final AbstractListenerReadPublisher<?> publisher;
public RequestBodySubscription(AbstractRequestBodyPublisher<?> publisher) { public ReadSubscription(AbstractListenerReadPublisher<?> publisher) {
this.publisher = publisher; this.publisher = publisher;
} }
@ -207,15 +209,15 @@ public abstract class AbstractRequestBodyPublisher<T> implements Publisher<T> {
/** /**
* The initial unsubscribed state. Will respond to {@link * The initial unsubscribed state. Will respond to {@link
* #subscribe(AbstractRequestBodyPublisher, Subscriber)} by * #subscribe(AbstractListenerReadPublisher, Subscriber)} by
* changing state to {@link #NO_DEMAND}. * changing state to {@link #NO_DEMAND}.
*/ */
UNSUBSCRIBED { UNSUBSCRIBED {
@Override @Override
<T> void subscribe(AbstractRequestBodyPublisher<T> publisher, Subscriber<? super T> subscriber) { <T> void subscribe(AbstractListenerReadPublisher<T> publisher, Subscriber<? super T> subscriber) {
Objects.requireNonNull(subscriber); Objects.requireNonNull(subscriber);
if (publisher.changeState(this, NO_DEMAND)) { if (publisher.changeState(this, NO_DEMAND)) {
Subscription subscription = new RequestBodySubscription(publisher); Subscription subscription = new ReadSubscription(publisher);
publisher.subscriber = subscriber; publisher.subscriber = subscriber;
subscriber.onSubscribe(subscription); subscriber.onSubscribe(subscription);
} }
@ -227,13 +229,13 @@ public abstract class AbstractRequestBodyPublisher<T> implements Publisher<T> {
/** /**
* State that gets entered when there is no demand. Responds to {@link * State that gets entered when there is no demand. Responds to {@link
* #request(AbstractRequestBodyPublisher, long)} by increasing the demand, * #request(AbstractListenerReadPublisher, long)} by increasing the demand,
* changing state to {@link #DEMAND} and will check whether there * changing state to {@link #DEMAND} and will check whether there
* is data available for reading. * is data available for reading.
*/ */
NO_DEMAND { NO_DEMAND {
@Override @Override
<T> void request(AbstractRequestBodyPublisher<T> publisher, long n) { <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
if (Operators.checkRequest(n, publisher.subscriber)) { if (Operators.checkRequest(n, publisher.subscriber)) {
Operators.addAndGet(publisher.demand, n); Operators.addAndGet(publisher.demand, n);
if (publisher.changeState(this, DEMAND)) { if (publisher.changeState(this, DEMAND)) {
@ -245,20 +247,20 @@ public abstract class AbstractRequestBodyPublisher<T> implements Publisher<T> {
/** /**
* State that gets entered when there is demand. Responds to * State that gets entered when there is demand. Responds to
* {@link #onDataAvailable(AbstractRequestBodyPublisher)} by * {@link #onDataAvailable(AbstractListenerReadPublisher)} by
* reading the available data. The state will be changed to * reading the available data. The state will be changed to
* {@link #NO_DEMAND} if there is no demand. * {@link #NO_DEMAND} if there is no demand.
*/ */
DEMAND { DEMAND {
@Override @Override
<T> void request(AbstractRequestBodyPublisher<T> publisher, long n) { <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
if (Operators.checkRequest(n, publisher.subscriber)) { if (Operators.checkRequest(n, publisher.subscriber)) {
Operators.addAndGet(publisher.demand, n); Operators.addAndGet(publisher.demand, n);
} }
} }
@Override @Override
<T> void onDataAvailable(AbstractRequestBodyPublisher<T> publisher) { <T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) {
if (publisher.changeState(this, READING)) { if (publisher.changeState(this, READING)) {
try { try {
boolean demandAvailable = publisher.readAndPublish(); boolean demandAvailable = publisher.readAndPublish();
@ -279,7 +281,7 @@ public abstract class AbstractRequestBodyPublisher<T> implements Publisher<T> {
READING { READING {
@Override @Override
<T> void request(AbstractRequestBodyPublisher<T> publisher, long n) { <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
if (Operators.checkRequest(n, publisher.subscriber)) { if (Operators.checkRequest(n, publisher.subscriber)) {
Operators.addAndGet(publisher.demand, n); Operators.addAndGet(publisher.demand, n);
} }
@ -291,40 +293,40 @@ public abstract class AbstractRequestBodyPublisher<T> implements Publisher<T> {
*/ */
COMPLETED { COMPLETED {
@Override @Override
<T> void request(AbstractRequestBodyPublisher<T> publisher, long n) { <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
// ignore // ignore
} }
@Override @Override
<T> void cancel(AbstractRequestBodyPublisher<T> publisher) { <T> void cancel(AbstractListenerReadPublisher<T> publisher) {
// ignore // ignore
} }
@Override @Override
<T> void onAllDataRead(AbstractRequestBodyPublisher<T> publisher) { <T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
// ignore // ignore
} }
@Override @Override
<T> void onError(AbstractRequestBodyPublisher<T> publisher, Throwable t) { <T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable t) {
// ignore // ignore
} }
}; };
<T> void subscribe(AbstractRequestBodyPublisher<T> publisher, Subscriber<? super T> subscriber) { <T> void subscribe(AbstractListenerReadPublisher<T> publisher, Subscriber<? super T> subscriber) {
throw new IllegalStateException(toString()); throw new IllegalStateException(toString());
} }
<T> void request(AbstractRequestBodyPublisher<T> publisher, long n) { <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
throw new IllegalStateException(toString()); throw new IllegalStateException(toString());
} }
<T> void cancel(AbstractRequestBodyPublisher<T> publisher) { <T> void cancel(AbstractListenerReadPublisher<T> publisher) {
publisher.changeState(this, COMPLETED); publisher.changeState(this, COMPLETED);
} }
<T> void onDataAvailable(AbstractRequestBodyPublisher<T> publisher) { <T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) {
// ignore // ignore
} }
<T> void onAllDataRead(AbstractRequestBodyPublisher<T> publisher) { <T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
if (publisher.changeState(this, COMPLETED)) { if (publisher.changeState(this, COMPLETED)) {
if (publisher.subscriber != null) { if (publisher.subscriber != null) {
publisher.subscriber.onComplete(); publisher.subscriber.onComplete();
@ -332,7 +334,7 @@ public abstract class AbstractRequestBodyPublisher<T> implements Publisher<T> {
} }
} }
<T> void onError(AbstractRequestBodyPublisher<T> publisher, Throwable t) { <T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable t) {
if (publisher.changeState(this, COMPLETED)) { if (publisher.changeState(this, COMPLETED)) {
if (publisher.subscriber != null) { if (publisher.subscriber != null) {
publisher.subscriber.onError(t); publisher.subscriber.onError(t);

View File

@ -33,16 +33,18 @@ import org.springframework.util.Assert;
/** /**
* Abstract base class for {@code Processor} implementations that bridge between * Abstract base class for {@code Processor} implementations that bridge between
* event-listener APIs and Reactive Streams. Specifically, base class for the * event-listener write APIs and Reactive Streams. Specifically, base class for
* Servlet 3.1 and Undertow support. * writing to the HTTP response body with Servlet 3.1 and Undertow support as
* well for writing WebSocket messages with JSR-356, Jetty, and Undertow.
* *
* @author Arjen Poutsma * @author Arjen Poutsma
* @author Violeta Georgieva
* @since 5.0 * @since 5.0
* @see ServletServerHttpRequest * @see ServletServerHttpRequest
* @see UndertowHttpHandlerAdapter * @see UndertowHttpHandlerAdapter
* @see ServerHttpResponse#writeWith(Publisher) * @see ServerHttpResponse#writeWith(Publisher)
*/ */
public abstract class AbstractResponseBodyProcessor<T> implements Processor<T, Void> { public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, Void> {
protected final Log logger = LogFactory.getLog(getClass()); protected final Log logger = LogFactory.getLog(getClass());
@ -190,7 +192,7 @@ public abstract class AbstractResponseBodyProcessor<T> implements Processor<T, V
UNSUBSCRIBED { UNSUBSCRIBED {
@Override @Override
public <T> void onSubscribe(AbstractResponseBodyProcessor<T> processor, Subscription subscription) { public <T> void onSubscribe(AbstractListenerWriteProcessor<T> processor, Subscription subscription) {
Objects.requireNonNull(subscription, "Subscription cannot be null"); Objects.requireNonNull(subscription, "Subscription cannot be null");
if (processor.changeState(this, REQUESTED)) { if (processor.changeState(this, REQUESTED)) {
processor.subscription = subscription; processor.subscription = subscription;
@ -210,7 +212,7 @@ public abstract class AbstractResponseBodyProcessor<T> implements Processor<T, V
REQUESTED { REQUESTED {
@Override @Override
public <T> void onNext(AbstractResponseBodyProcessor<T> processor, T data) { public <T> void onNext(AbstractListenerWriteProcessor<T> processor, T data) {
if (processor.isDataEmpty(data)) { if (processor.isDataEmpty(data)) {
processor.subscription.request(1); processor.subscription.request(1);
} }
@ -223,7 +225,7 @@ public abstract class AbstractResponseBodyProcessor<T> implements Processor<T, V
} }
@Override @Override
public <T> void onComplete(AbstractResponseBodyProcessor<T> processor) { public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
if (processor.changeState(this, COMPLETED)) { if (processor.changeState(this, COMPLETED)) {
processor.resultPublisher.publishComplete(); processor.resultPublisher.publishComplete();
} }
@ -241,7 +243,7 @@ public abstract class AbstractResponseBodyProcessor<T> implements Processor<T, V
RECEIVED { RECEIVED {
@Override @Override
public <T> void onWritePossible(AbstractResponseBodyProcessor<T> processor) { public <T> void onWritePossible(AbstractListenerWriteProcessor<T> processor) {
if (processor.changeState(this, WRITING)) { if (processor.changeState(this, WRITING)) {
T data = processor.currentData; T data = processor.currentData;
try { try {
@ -270,7 +272,7 @@ public abstract class AbstractResponseBodyProcessor<T> implements Processor<T, V
} }
@Override @Override
public <T> void onComplete(AbstractResponseBodyProcessor<T> processor) { public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
processor.subscriberCompleted = true; processor.subscriberCompleted = true;
} }
}, },
@ -281,7 +283,7 @@ public abstract class AbstractResponseBodyProcessor<T> implements Processor<T, V
WRITING { WRITING {
@Override @Override
public <T> void onComplete(AbstractResponseBodyProcessor<T> processor) { public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
processor.subscriberCompleted = true; processor.subscriberCompleted = true;
} }
}, },
@ -291,40 +293,40 @@ public abstract class AbstractResponseBodyProcessor<T> implements Processor<T, V
COMPLETED { COMPLETED {
@Override @Override
public <T> void onNext(AbstractResponseBodyProcessor<T> processor, T data) { public <T> void onNext(AbstractListenerWriteProcessor<T> processor, T data) {
// ignore // ignore
} }
@Override @Override
public <T> void onError(AbstractResponseBodyProcessor<T> processor, Throwable ex) { public <T> void onError(AbstractListenerWriteProcessor<T> processor, Throwable ex) {
// ignore // ignore
} }
@Override @Override
public <T> void onComplete(AbstractResponseBodyProcessor<T> processor) { public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
// ignore // ignore
} }
}; };
public <T> void onSubscribe(AbstractResponseBodyProcessor<T> processor, Subscription subscription) { public <T> void onSubscribe(AbstractListenerWriteProcessor<T> processor, Subscription subscription) {
subscription.cancel(); subscription.cancel();
} }
public <T> void onNext(AbstractResponseBodyProcessor<T> processor, T data) { public <T> void onNext(AbstractListenerWriteProcessor<T> processor, T data) {
throw new IllegalStateException(toString()); throw new IllegalStateException(toString());
} }
public <T> void onError(AbstractResponseBodyProcessor<T> processor, Throwable ex) { public <T> void onError(AbstractListenerWriteProcessor<T> processor, Throwable ex) {
if (processor.changeState(this, COMPLETED)) { if (processor.changeState(this, COMPLETED)) {
processor.resultPublisher.publishError(ex); processor.resultPublisher.publishError(ex);
} }
} }
public <T> void onComplete(AbstractResponseBodyProcessor<T> processor) { public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
throw new IllegalStateException(toString()); throw new IllegalStateException(toString());
} }
public <T> void onWritePossible(AbstractResponseBodyProcessor<T> processor) { public <T> void onWritePossible(AbstractListenerWriteProcessor<T> processor) {
// ignore // ignore
} }
} }

View File

@ -203,7 +203,7 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest {
} }
private static class RequestBodyPublisher extends AbstractRequestBodyPublisher<DataBuffer> { private static class RequestBodyPublisher extends AbstractListenerReadPublisher<DataBuffer> {
private final RequestBodyPublisher.RequestBodyReadListener readListener = private final RequestBodyPublisher.RequestBodyReadListener readListener =
new RequestBodyPublisher.RequestBodyReadListener(); new RequestBodyPublisher.RequestBodyReadListener();

View File

@ -184,7 +184,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
} }
private class ResponseBodyProcessor extends AbstractResponseBodyProcessor<DataBuffer> { private class ResponseBodyProcessor extends AbstractListenerWriteProcessor<DataBuffer> {
private final ServletOutputStream outputStream; private final ServletOutputStream outputStream;

View File

@ -106,7 +106,7 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest {
return Flux.from(this.body); return Flux.from(this.body);
} }
private static class RequestBodyPublisher extends AbstractRequestBodyPublisher<DataBuffer> { private static class RequestBodyPublisher extends AbstractListenerReadPublisher<DataBuffer> {
private final ChannelListener<StreamSourceChannel> readListener = private final ChannelListener<StreamSourceChannel> readListener =
new ReadListener(); new ReadListener();

View File

@ -139,7 +139,7 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon
} }
private static class ResponseBodyProcessor extends AbstractResponseBodyProcessor<DataBuffer> { private static class ResponseBodyProcessor extends AbstractListenerWriteProcessor<DataBuffer> {
private final ChannelListener<StreamSinkChannel> listener = new WriteListener(); private final ChannelListener<StreamSinkChannel> listener = new WriteListener();

View File

@ -32,12 +32,12 @@ import org.springframework.core.io.buffer.DataBuffer;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
/** /**
* Unit tests for {@link AbstractRequestBodyPublisher} * Unit tests for {@link AbstractListenerReadPublisher}
* *
* @author Violeta Georgieva * @author Violeta Georgieva
* @since 5.0 * @since 5.0
*/ */
public class AbstractRequestBodyPublisherTests { public class ListenerReadPublisherTests {
@Test @Test
public void testReceiveTwoRequestCallsWhenOnSubscribe() { public void testReceiveTwoRequestCallsWhenOnSubscribe() {
@ -45,14 +45,14 @@ public class AbstractRequestBodyPublisherTests {
Subscriber<DataBuffer> subscriber = mock(Subscriber.class); Subscriber<DataBuffer> subscriber = mock(Subscriber.class);
doAnswer(new SubscriptionAnswer()).when(subscriber).onSubscribe(isA(Subscription.class)); doAnswer(new SubscriptionAnswer()).when(subscriber).onSubscribe(isA(Subscription.class));
TestRequestBodyPublisher publisher = new TestRequestBodyPublisher(); TestListenerReadPublisher publisher = new TestListenerReadPublisher();
publisher.subscribe(subscriber); publisher.subscribe(subscriber);
publisher.onDataAvailable(); publisher.onDataAvailable();
assertTrue(publisher.getReadCalls() == 2); assertTrue(publisher.getReadCalls() == 2);
} }
private static final class TestRequestBodyPublisher extends AbstractRequestBodyPublisher { private static final class TestListenerReadPublisher extends AbstractListenerReadPublisher {
private int readCalls = 0; private int readCalls = 0;