diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java index 6398e71f89..bcdc2538e5 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java @@ -312,7 +312,9 @@ public abstract class AbstractListenerReadPublisher implements Publisher { } void cancel(AbstractListenerReadPublisher publisher) { - publisher.changeState(this, COMPLETED); + if (!publisher.changeState(this, COMPLETED)) { + publisher.state.get().cancel(publisher); + } } void onDataAvailable(AbstractListenerReadPublisher publisher) { @@ -325,6 +327,9 @@ public abstract class AbstractListenerReadPublisher implements Publisher { publisher.subscriber.onComplete(); } } + else { + publisher.state.get().onAllDataRead(publisher); + } } void onError(AbstractListenerReadPublisher publisher, Throwable t) { @@ -333,6 +338,9 @@ public abstract class AbstractListenerReadPublisher implements Publisher { publisher.subscriber.onError(t); } } + else { + publisher.state.get().onError(publisher, t); + } } } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java index 6bde76013d..613909d3e9 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java @@ -190,6 +190,9 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo if (processor.changeState(this, COMPLETED)) { processor.resultPublisher.publishComplete(); } + else { + processor.state.get().onComplete(processor); + } } }, @@ -212,6 +215,9 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo else if (processor.changeState(this, COMPLETED)) { processor.resultPublisher.publishComplete(); } + else { + processor.state.get().onComplete(processor); + } } else { if (processor.changeState(this, REQUESTED)) { @@ -238,6 +244,9 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo if (processor.changeState(this, COMPLETED)) { processor.resultPublisher.publishComplete(); } + else { + processor.state.get().onComplete(processor); + } } public void onNext(AbstractListenerWriteFlushProcessor processor, Publisher publisher) { // ignore @@ -275,6 +284,9 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo if (processor.changeState(this, COMPLETED)) { processor.resultPublisher.publishError(ex); } + else { + processor.state.get().onError(processor, ex); + } } public void onComplete(AbstractListenerWriteFlushProcessor processor) { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java index fa59a40be2..efd91c5834 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java @@ -251,6 +251,9 @@ public abstract class AbstractListenerWriteProcessor implements Processor implements Processor implements Processor void onComplete(AbstractListenerWriteProcessor processor) { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java index 2a2f3dfdc9..fe4d8b53a8 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java @@ -158,6 +158,9 @@ class WriteResultPublisher implements Publisher { Assert.state(publisher.subscriber != null, "No subscriber"); publisher.subscriber.onComplete(); } + else { + publisher.state.get().publishComplete(publisher); + } } @Override void publishError(WriteResultPublisher publisher, Throwable t) { @@ -165,6 +168,9 @@ class WriteResultPublisher implements Publisher { Assert.state(publisher.subscriber != null, "No subscriber"); publisher.subscriber.onError(t); } + else { + publisher.state.get().publishError(publisher, t); + } } }, @@ -196,7 +202,9 @@ class WriteResultPublisher implements Publisher { } void cancel(WriteResultPublisher publisher) { - publisher.changeState(this, COMPLETED); + if (!publisher.changeState(this, COMPLETED)) { + publisher.state.get().cancel(publisher); + } } void publishComplete(WriteResultPublisher publisher) {