From 41b13a4e8a7dfe2b4536a016532fccbe81a94e0c Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Tue, 21 Nov 2017 00:25:38 +0200 Subject: [PATCH] Ensure onComplete/onError events will be delivered Issue: SPR-16207 --- .../AbstractListenerReadPublisher.java | 10 +++++- .../AbstractListenerWriteFlushProcessor.java | 12 +++++++ .../AbstractListenerWriteProcessor.java | 34 +++++++++++++++---- .../server/reactive/WriteResultPublisher.java | 10 +++++- 4 files changed, 57 insertions(+), 9 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java index 6398e71f89e..bcdc2538e5f 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java @@ -312,7 +312,9 @@ public abstract class AbstractListenerReadPublisher implements Publisher { } void cancel(AbstractListenerReadPublisher publisher) { - publisher.changeState(this, COMPLETED); + if (!publisher.changeState(this, COMPLETED)) { + publisher.state.get().cancel(publisher); + } } void onDataAvailable(AbstractListenerReadPublisher publisher) { @@ -325,6 +327,9 @@ public abstract class AbstractListenerReadPublisher implements Publisher { publisher.subscriber.onComplete(); } } + else { + publisher.state.get().onAllDataRead(publisher); + } } void onError(AbstractListenerReadPublisher publisher, Throwable t) { @@ -333,6 +338,9 @@ public abstract class AbstractListenerReadPublisher implements Publisher { publisher.subscriber.onError(t); } } + else { + publisher.state.get().onError(publisher, t); + } } } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java index 6bde76013df..613909d3e9a 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java @@ -190,6 +190,9 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo if (processor.changeState(this, COMPLETED)) { processor.resultPublisher.publishComplete(); } + else { + processor.state.get().onComplete(processor); + } } }, @@ -212,6 +215,9 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo else if (processor.changeState(this, COMPLETED)) { processor.resultPublisher.publishComplete(); } + else { + processor.state.get().onComplete(processor); + } } else { if (processor.changeState(this, REQUESTED)) { @@ -238,6 +244,9 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo if (processor.changeState(this, COMPLETED)) { processor.resultPublisher.publishComplete(); } + else { + processor.state.get().onComplete(processor); + } } public void onNext(AbstractListenerWriteFlushProcessor processor, Publisher publisher) { // ignore @@ -275,6 +284,9 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo if (processor.changeState(this, COMPLETED)) { processor.resultPublisher.publishError(ex); } + else { + processor.state.get().onError(processor, ex); + } } public void onComplete(AbstractListenerWriteFlushProcessor processor) { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java index fa59a40be2d..efd91c58344 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java @@ -251,6 +251,9 @@ public abstract class AbstractListenerWriteProcessor implements Processor implements Processor implements Processor void onComplete(AbstractListenerWriteProcessor processor) { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java index 2a2f3dfdc9e..fe4d8b53a85 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java @@ -158,6 +158,9 @@ class WriteResultPublisher implements Publisher { Assert.state(publisher.subscriber != null, "No subscriber"); publisher.subscriber.onComplete(); } + else { + publisher.state.get().publishComplete(publisher); + } } @Override void publishError(WriteResultPublisher publisher, Throwable t) { @@ -165,6 +168,9 @@ class WriteResultPublisher implements Publisher { Assert.state(publisher.subscriber != null, "No subscriber"); publisher.subscriber.onError(t); } + else { + publisher.state.get().publishError(publisher, t); + } } }, @@ -196,7 +202,9 @@ class WriteResultPublisher implements Publisher { } void cancel(WriteResultPublisher publisher) { - publisher.changeState(this, COMPLETED); + if (!publisher.changeState(this, COMPLETED)) { + publisher.state.get().cancel(publisher); + } } void publishComplete(WriteResultPublisher publisher) {