Consistently apply onCompletion/onError handling

Follow-up change in addition to dd22b8fd.

See gh-23096
This commit is contained in:
Rossen Stoyanchev 2019-08-29 09:20:50 +03:00
parent 4a4edeb97f
commit 88e9dcef0c
1 changed files with 22 additions and 20 deletions

View File

@ -224,6 +224,23 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
}
}
private void handleCompletionOrErrorBeforeDemand() {
State state = this.state.get();
if (!state.equals(State.UNSUBSCRIBED) && !state.equals(State.SUBSCRIBING)) {
if (this.completionBeforeDemand) {
rsReadLogger.trace(getLogPrefix() + "Completed before demand");
this.state.get().onAllDataRead(this);
}
Throwable ex = this.errorBeforeDemand;
if (ex != null) {
if (rsReadLogger.isTraceEnabled()) {
rsReadLogger.trace(getLogPrefix() + "Completed with error before demand: " + ex);
}
this.state.get().onError(this, ex);
}
}
}
private Subscription createSubscription() {
return new ReadSubscription();
}
@ -283,7 +300,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
publisher.subscriber = subscriber;
subscriber.onSubscribe(subscription);
publisher.changeState(SUBSCRIBING, NO_DEMAND);
handleCompletionOrErrorBeforeDemand(publisher);
publisher.handleCompletionOrErrorBeforeDemand();
}
else {
throw new IllegalStateException("Failed to transition to SUBSCRIBING, " +
@ -294,30 +311,13 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@Override
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
publisher.completionBeforeDemand = true;
handleCompletionOrErrorBeforeDemand(publisher);
publisher.handleCompletionOrErrorBeforeDemand();
}
@Override
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
publisher.errorBeforeDemand = ex;
handleCompletionOrErrorBeforeDemand(publisher);
}
private <T> void handleCompletionOrErrorBeforeDemand(AbstractListenerReadPublisher<T> publisher) {
if (publisher.state.get().equals(NO_DEMAND)) {
if (publisher.completionBeforeDemand) {
rsReadLogger.trace(publisher.getLogPrefix() + "Completed before demand");
publisher.state.get().onAllDataRead(publisher);
}
Throwable ex = publisher.errorBeforeDemand;
if (ex != null) {
if (rsReadLogger.isTraceEnabled()) {
String prefix = publisher.getLogPrefix();
rsReadLogger.trace(prefix + "Completed with error before demand: " + ex);
}
publisher.state.get().onError(publisher, ex);
}
}
publisher.handleCompletionOrErrorBeforeDemand();
}
},
@ -337,11 +337,13 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@Override
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
publisher.completionBeforeDemand = true;
publisher.handleCompletionOrErrorBeforeDemand();
}
@Override
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
publisher.errorBeforeDemand = ex;
publisher.handleCompletionOrErrorBeforeDemand();
}
},