From 4798a1eb026d6b51bb15bcd64de1d349bf2183e6 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Tue, 26 Jul 2016 16:20:56 +0300 Subject: [PATCH] Cancel Subscription when onError is invoked internally AbstractResponseBodyProcessor.onError and AbstractResponseBodyFlushProcessor.onError will be invoked when: - The Publisher wants to signal with onError that there are failures. Once onError is invoked the Subscription should be considered canceled. - The internal implementation wants to signal with onError that there are failures. In this use case the implementation should invoke Subscription.cancel() --- .../reactive/AbstractResponseBodyFlushProcessor.java | 8 +++++++- .../server/reactive/AbstractResponseBodyProcessor.java | 6 +++++- .../http/server/reactive/ServletServerHttpResponse.java | 1 + 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java index 71adf24dd1..811cb78a96 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java @@ -106,6 +106,10 @@ abstract class AbstractResponseBodyFlushProcessor */ protected abstract void flush() throws IOException; + private void cancel() { + this.subscription.cancel(); + } + private void writeComplete() { if (logger.isTraceEnabled()) { logger.trace(this.state + " writeComplete"); @@ -157,11 +161,12 @@ abstract class AbstractResponseBodyFlushProcessor else { try { processor.flush(); + processor.subscription.request(1); } catch (IOException ex) { + processor.cancel(); processor.onError(ex); } - processor.subscription.request(1); } } }, COMPLETED { @@ -231,6 +236,7 @@ abstract class AbstractResponseBodyFlushProcessor @Override public void onError(Throwable t) { + processor.cancel(); processor.onError(t); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java index 1a0268cd07..2463fb9414 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java @@ -159,6 +159,10 @@ abstract class AbstractResponseBodyProcessor implements Processor