From f7fc36250e815ca4bd886eb366382b9983706ced Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 29 Apr 2021 22:09:03 +0100 Subject: [PATCH] Properly handle completion while in READING state Closes gh-26834 --- .../AbstractListenerReadPublisher.java | 60 ++++++++++++------- .../AbstractListenerWriteProcessor.java | 5 +- 2 files changed, 42 insertions(+), 23 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java index a432dc7a78..21776e751d 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java @@ -68,10 +68,10 @@ public abstract class AbstractListenerReadPublisher implements Publisher { @Nullable private volatile Subscriber subscriber; - private volatile boolean completionBeforeDemand; + private volatile boolean completionPending; @Nullable - private volatile Throwable errorBeforeDemand; + private volatile Throwable errorPending; private final String logPrefix; @@ -228,21 +228,24 @@ public abstract class AbstractListenerReadPublisher implements Publisher { } } - private void handleCompletionOrErrorBeforeDemand() { + private boolean handlePendingCompletionOrError() { State state = this.state.get(); - if (!state.equals(State.UNSUBSCRIBED) && !state.equals(State.SUBSCRIBING)) { - if (this.completionBeforeDemand) { - rsReadLogger.trace(getLogPrefix() + "Completed before demand"); + if (state.equals(State.DEMAND) || state.equals(State.NO_DEMAND)) { + if (this.completionPending) { + rsReadLogger.trace(getLogPrefix() + "Processing pending completion"); this.state.get().onAllDataRead(this); + return true; } - Throwable ex = this.errorBeforeDemand; + Throwable ex = this.errorPending; if (ex != null) { if (rsReadLogger.isTraceEnabled()) { - rsReadLogger.trace(getLogPrefix() + "Completed with error before demand: " + ex); + rsReadLogger.trace(getLogPrefix() + "Processing pending completion with error: " + ex); } this.state.get().onError(this, ex); + return true; } } + return false; } private Subscription createSubscription() { @@ -305,7 +308,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { publisher.subscriber = subscriber; subscriber.onSubscribe(subscription); publisher.changeState(SUBSCRIBING, NO_DEMAND); - publisher.handleCompletionOrErrorBeforeDemand(); + publisher.handlePendingCompletionOrError(); } else { throw new IllegalStateException("Failed to transition to SUBSCRIBING, " + @@ -315,14 +318,14 @@ public abstract class AbstractListenerReadPublisher implements Publisher { @Override void onAllDataRead(AbstractListenerReadPublisher publisher) { - publisher.completionBeforeDemand = true; - publisher.handleCompletionOrErrorBeforeDemand(); + publisher.completionPending = true; + publisher.handlePendingCompletionOrError(); } @Override void onError(AbstractListenerReadPublisher publisher, Throwable ex) { - publisher.errorBeforeDemand = ex; - publisher.handleCompletionOrErrorBeforeDemand(); + publisher.errorPending = ex; + publisher.handlePendingCompletionOrError(); } }, @@ -341,14 +344,14 @@ public abstract class AbstractListenerReadPublisher implements Publisher { @Override void onAllDataRead(AbstractListenerReadPublisher publisher) { - publisher.completionBeforeDemand = true; - publisher.handleCompletionOrErrorBeforeDemand(); + publisher.completionPending = true; + publisher.handlePendingCompletionOrError(); } @Override void onError(AbstractListenerReadPublisher publisher, Throwable ex) { - publisher.errorBeforeDemand = ex; - publisher.handleCompletionOrErrorBeforeDemand(); + publisher.errorPending = ex; + publisher.handlePendingCompletionOrError(); } }, @@ -379,14 +382,17 @@ public abstract class AbstractListenerReadPublisher implements Publisher { boolean demandAvailable = publisher.readAndPublish(); if (demandAvailable) { publisher.changeToDemandState(READING); + publisher.handlePendingCompletionOrError(); } else { publisher.readingPaused(); if (publisher.changeState(READING, NO_DEMAND)) { - // Demand may have arrived since readAndPublish returned - long r = publisher.demand; - if (r > 0) { - publisher.changeToDemandState(NO_DEMAND); + if (!publisher.handlePendingCompletionOrError()) { + // Demand may have arrived since readAndPublish returned + long r = publisher.demand; + if (r > 0) { + publisher.changeToDemandState(NO_DEMAND); + } } } } @@ -408,6 +414,18 @@ public abstract class AbstractListenerReadPublisher implements Publisher { publisher.changeToDemandState(NO_DEMAND); } } + + @Override + void onAllDataRead(AbstractListenerReadPublisher publisher) { + publisher.completionPending = true; + publisher.handlePendingCompletionOrError(); + } + + @Override + void onError(AbstractListenerReadPublisher publisher, Throwable ex) { + publisher.errorPending = ex; + publisher.handlePendingCompletionOrError(); + } }, COMPLETED { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java index 6cfd8412a6..b1a2a3affe 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java @@ -151,10 +151,11 @@ public abstract class AbstractListenerWriteProcessor implements Processor