Cancel Subscription when onError is invoked internally

AbstractResponseBodyProcessor.onError and
AbstractResponseBodyFlushProcessor.onError will be invoked when:
- The Publisher wants to signal with onError that there are failures.
Once onError is invoked the Subscription should be considered canceled.
- The internal implementation wants to signal with onError that there
are failures. In this use case the implementation should invoke
Subscription.cancel()
This commit is contained in:
Violeta Georgieva 2016-07-26 16:20:56 +03:00 committed by Rossen Stoyanchev
parent 16939b7bc7
commit 4798a1eb02
3 changed files with 13 additions and 2 deletions

View File

@ -106,6 +106,10 @@ abstract class AbstractResponseBodyFlushProcessor
*/
protected abstract void flush() throws IOException;
private void cancel() {
this.subscription.cancel();
}
private void writeComplete() {
if (logger.isTraceEnabled()) {
logger.trace(this.state + " writeComplete");
@ -157,11 +161,12 @@ abstract class AbstractResponseBodyFlushProcessor
else {
try {
processor.flush();
processor.subscription.request(1);
}
catch (IOException ex) {
processor.cancel();
processor.onError(ex);
}
processor.subscription.request(1);
}
}
}, COMPLETED {
@ -231,6 +236,7 @@ abstract class AbstractResponseBodyFlushProcessor
@Override
public void onError(Throwable t) {
processor.cancel();
processor.onError(t);
}

View File

@ -159,6 +159,10 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
*/
protected abstract boolean write(DataBuffer dataBuffer) throws IOException;
protected void cancel() {
this.subscription.cancel();
}
private boolean changeState(State oldState, State newState) {
return this.state.compareAndSet(oldState, newState);
}
@ -220,7 +224,6 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
@Override
void onComplete(AbstractResponseBodyProcessor processor) {
if (processor.changeState(this, COMPLETED)) {
processor.subscriberCompleted = true;
processor.publisherDelegate.publishComplete();
}
}
@ -258,6 +261,7 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
}
}
catch (IOException ex) {
processor.cancel();
processor.onError(ex);
}
}

View File

@ -224,6 +224,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
@Override
public void onError(Throwable ex) {
if (bodyProcessor != null) {
bodyProcessor.cancel();
bodyProcessor.onError(ex);
}
}