Properly handle completion while in READING state

Closes gh-26834
This commit is contained in:
Rossen Stoyanchev 2021-04-29 22:09:03 +01:00
parent ae7d5200e5
commit f7fc36250e
2 changed files with 42 additions and 23 deletions

View File

@ -68,10 +68,10 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@Nullable
private volatile Subscriber<? super T> subscriber;
private volatile boolean completionBeforeDemand;
private volatile boolean completionPending;
@Nullable
private volatile Throwable errorBeforeDemand;
private volatile Throwable errorPending;
private final String logPrefix;
@ -228,21 +228,24 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
}
}
private void handleCompletionOrErrorBeforeDemand() {
private boolean handlePendingCompletionOrError() {
State state = this.state.get();
if (!state.equals(State.UNSUBSCRIBED) && !state.equals(State.SUBSCRIBING)) {
if (this.completionBeforeDemand) {
rsReadLogger.trace(getLogPrefix() + "Completed before demand");
if (state.equals(State.DEMAND) || state.equals(State.NO_DEMAND)) {
if (this.completionPending) {
rsReadLogger.trace(getLogPrefix() + "Processing pending completion");
this.state.get().onAllDataRead(this);
return true;
}
Throwable ex = this.errorBeforeDemand;
Throwable ex = this.errorPending;
if (ex != null) {
if (rsReadLogger.isTraceEnabled()) {
rsReadLogger.trace(getLogPrefix() + "Completed with error before demand: " + ex);
rsReadLogger.trace(getLogPrefix() + "Processing pending completion with error: " + ex);
}
this.state.get().onError(this, ex);
return true;
}
}
return false;
}
private Subscription createSubscription() {
@ -305,7 +308,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
publisher.subscriber = subscriber;
subscriber.onSubscribe(subscription);
publisher.changeState(SUBSCRIBING, NO_DEMAND);
publisher.handleCompletionOrErrorBeforeDemand();
publisher.handlePendingCompletionOrError();
}
else {
throw new IllegalStateException("Failed to transition to SUBSCRIBING, " +
@ -315,14 +318,14 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@Override
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
publisher.completionBeforeDemand = true;
publisher.handleCompletionOrErrorBeforeDemand();
publisher.completionPending = true;
publisher.handlePendingCompletionOrError();
}
@Override
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
publisher.errorBeforeDemand = ex;
publisher.handleCompletionOrErrorBeforeDemand();
publisher.errorPending = ex;
publisher.handlePendingCompletionOrError();
}
},
@ -341,14 +344,14 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@Override
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
publisher.completionBeforeDemand = true;
publisher.handleCompletionOrErrorBeforeDemand();
publisher.completionPending = true;
publisher.handlePendingCompletionOrError();
}
@Override
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
publisher.errorBeforeDemand = ex;
publisher.handleCompletionOrErrorBeforeDemand();
publisher.errorPending = ex;
publisher.handlePendingCompletionOrError();
}
},
@ -379,14 +382,17 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
boolean demandAvailable = publisher.readAndPublish();
if (demandAvailable) {
publisher.changeToDemandState(READING);
publisher.handlePendingCompletionOrError();
}
else {
publisher.readingPaused();
if (publisher.changeState(READING, NO_DEMAND)) {
// Demand may have arrived since readAndPublish returned
long r = publisher.demand;
if (r > 0) {
publisher.changeToDemandState(NO_DEMAND);
if (!publisher.handlePendingCompletionOrError()) {
// Demand may have arrived since readAndPublish returned
long r = publisher.demand;
if (r > 0) {
publisher.changeToDemandState(NO_DEMAND);
}
}
}
}
@ -408,6 +414,18 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
publisher.changeToDemandState(NO_DEMAND);
}
}
@Override
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
publisher.completionPending = true;
publisher.handlePendingCompletionOrError();
}
@Override
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
publisher.errorPending = ex;
publisher.handlePendingCompletionOrError();
}
},
COMPLETED {

View File

@ -151,10 +151,11 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
* container.
*/
public final void onWritePossible() {
State state = this.state.get();
if (rsWriteLogger.isTraceEnabled()) {
rsWriteLogger.trace(getLogPrefix() + "onWritePossible");
rsWriteLogger.trace(getLogPrefix() + "onWritePossible [" + state + "]");
}
this.state.get().onWritePossible(this);
state.onWritePossible(this);
}
/**