AbstractResponseBodySubscriber improvements

- AbstractResponseBodySubscriber now checks if the current state is
   expected before changing to a new state.
 - Included comments by @violetagg
This commit is contained in:
Arjen Poutsma 2016-06-28 11:17:57 +02:00
parent 13b6f4fee4
commit 3a681fba89
2 changed files with 27 additions and 21 deletions

View File

@ -139,8 +139,8 @@ abstract class AbstractResponseBodySubscriber implements Subscriber<DataBuffer>
*/ */
protected abstract void close(); protected abstract void close();
private void changeState(State oldState, State newState) { private boolean changeState(State oldState, State newState) {
this.state.compareAndSet(oldState, newState); return this.state.compareAndSet(oldState, newState);
} }
/** /**
@ -171,8 +171,9 @@ abstract class AbstractResponseBodySubscriber implements Subscriber<DataBuffer>
Subscription subscription) { Subscription subscription) {
if (BackpressureUtils.validate(subscriber.subscription, subscription)) { if (BackpressureUtils.validate(subscriber.subscription, subscription)) {
subscriber.subscription = subscription; subscriber.subscription = subscription;
subscriber.changeState(this, REQUESTED); if (subscriber.changeState(this, REQUESTED)) {
subscription.request(1); subscription.request(1);
}
} }
} }
}, },
@ -186,15 +187,17 @@ abstract class AbstractResponseBodySubscriber implements Subscriber<DataBuffer>
@Override @Override
void onNext(AbstractResponseBodySubscriber subscriber, void onNext(AbstractResponseBodySubscriber subscriber,
DataBuffer dataBuffer) { DataBuffer dataBuffer) {
subscriber.changeState(this, RECEIVED); if (subscriber.changeState(this, RECEIVED)) {
subscriber.receiveBuffer(dataBuffer); subscriber.receiveBuffer(dataBuffer);
}
} }
@Override @Override
void onComplete(AbstractResponseBodySubscriber subscriber) { void onComplete(AbstractResponseBodySubscriber subscriber) {
subscriber.subscriptionCompleted = true; if (subscriber.changeState(this, COMPLETED)) {
subscriber.changeState(this, COMPLETED); subscriber.subscriptionCompleted = true;
subscriber.close(); subscriber.close();
}
} }
}, },
/** /**
@ -217,12 +220,14 @@ abstract class AbstractResponseBodySubscriber implements Subscriber<DataBuffer>
subscriber.releaseBuffer(); subscriber.releaseBuffer();
boolean subscriptionCompleted = subscriber.subscriptionCompleted; boolean subscriptionCompleted = subscriber.subscriptionCompleted;
if (!subscriptionCompleted) { if (!subscriptionCompleted) {
subscriber.changeState(this, REQUESTED); if (subscriber.changeState(this, REQUESTED)) {
subscriber.subscription.request(1); subscriber.subscription.request(1);
}
} }
else { else {
subscriber.changeState(this, COMPLETED); if (subscriber.changeState(this, COMPLETED)) {
subscriber.close(); subscriber.close();
}
} }
} }
} }
@ -258,7 +263,7 @@ abstract class AbstractResponseBodySubscriber implements Subscriber<DataBuffer>
}; };
void onSubscribe(AbstractResponseBodySubscriber subscriber, Subscription s) { void onSubscribe(AbstractResponseBodySubscriber subscriber, Subscription s) {
throw new IllegalStateException(toString()); s.cancel();
} }
void onNext(AbstractResponseBodySubscriber subscriber, DataBuffer dataBuffer) { void onNext(AbstractResponseBodySubscriber subscriber, DataBuffer dataBuffer) {
@ -266,9 +271,10 @@ abstract class AbstractResponseBodySubscriber implements Subscriber<DataBuffer>
} }
void onError(AbstractResponseBodySubscriber subscriber, Throwable t) { void onError(AbstractResponseBodySubscriber subscriber, Throwable t) {
subscriber.changeState(this, COMPLETED); if (subscriber.changeState(this, COMPLETED)) {
subscriber.writeError(t); subscriber.writeError(t);
subscriber.close(); subscriber.close();
}
} }
void onComplete(AbstractResponseBodySubscriber subscriber) { void onComplete(AbstractResponseBodySubscriber subscriber) {

View File

@ -269,7 +269,8 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
onWritePossible(); onWritePossible();
} }
} }
catch (IOException ignored) { catch (IOException ex) {
onError(ex);
} }
} }
@ -277,13 +278,12 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
protected boolean write(DataBuffer dataBuffer) throws IOException { protected boolean write(DataBuffer dataBuffer) throws IOException {
ServletOutputStream output = outputStream(); ServletOutputStream output = outputStream();
boolean ready = output.isReady();
if (this.flushOnNext) { if (this.flushOnNext) {
flush(); flush();
ready = output.isReady();
} }
boolean ready = output.isReady();
if (this.logger.isTraceEnabled()) { if (this.logger.isTraceEnabled()) {
this.logger.trace("write: " + dataBuffer + " ready: " + ready); this.logger.trace("write: " + dataBuffer + " ready: " + ready);
} }