Fix race condition with onCompletion/onError
Closes gh-23096
This commit is contained in:
parent
8189c90741
commit
dd22b8fd39
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -283,19 +283,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
|||
publisher.subscriber = subscriber;
|
||||
subscriber.onSubscribe(subscription);
|
||||
publisher.changeState(SUBSCRIBING, NO_DEMAND);
|
||||
// Now safe to check "beforeDemand" flags, they won't change once in NO_DEMAND
|
||||
String logPrefix = publisher.getLogPrefix();
|
||||
if (publisher.completionBeforeDemand) {
|
||||
rsReadLogger.trace(logPrefix + "Completed before demand");
|
||||
publisher.state.get().onAllDataRead(publisher);
|
||||
}
|
||||
Throwable ex = publisher.errorBeforeDemand;
|
||||
if (ex != null) {
|
||||
if (rsReadLogger.isTraceEnabled()) {
|
||||
rsReadLogger.trace(logPrefix + "Completed with error before demand: " + ex);
|
||||
}
|
||||
publisher.state.get().onError(publisher, ex);
|
||||
}
|
||||
handleCompletionOrErrorBeforeDemand(publisher);
|
||||
}
|
||||
else {
|
||||
throw new IllegalStateException("Failed to transition to SUBSCRIBING, " +
|
||||
|
@ -306,11 +294,30 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
|||
@Override
|
||||
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
|
||||
publisher.completionBeforeDemand = true;
|
||||
handleCompletionOrErrorBeforeDemand(publisher);
|
||||
}
|
||||
|
||||
@Override
|
||||
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
|
||||
publisher.errorBeforeDemand = ex;
|
||||
handleCompletionOrErrorBeforeDemand(publisher);
|
||||
}
|
||||
|
||||
private <T> void handleCompletionOrErrorBeforeDemand(AbstractListenerReadPublisher<T> publisher) {
|
||||
if (publisher.state.get().equals(NO_DEMAND)) {
|
||||
if (publisher.completionBeforeDemand) {
|
||||
rsReadLogger.trace(publisher.getLogPrefix() + "Completed before demand");
|
||||
publisher.state.get().onAllDataRead(publisher);
|
||||
}
|
||||
Throwable ex = publisher.errorBeforeDemand;
|
||||
if (ex != null) {
|
||||
if (rsReadLogger.isTraceEnabled()) {
|
||||
String prefix = publisher.getLogPrefix();
|
||||
rsReadLogger.trace(prefix + "Completed with error before demand: " + ex);
|
||||
}
|
||||
publisher.state.get().onError(publisher, ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -282,17 +282,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
|
|||
}
|
||||
if (processor.changeState(this, REQUESTED)) {
|
||||
if (processor.subscriberCompleted) {
|
||||
if (processor.isFlushPending()) {
|
||||
// Ensure the final flush
|
||||
processor.changeState(REQUESTED, FLUSHING);
|
||||
processor.flushIfPossible();
|
||||
}
|
||||
else if (processor.changeState(REQUESTED, COMPLETED)) {
|
||||
processor.resultPublisher.publishComplete();
|
||||
}
|
||||
else {
|
||||
processor.state.get().onComplete(processor);
|
||||
}
|
||||
handleSubscriberCompleted(processor);
|
||||
}
|
||||
else {
|
||||
Assert.state(processor.subscription != null, "No subscription");
|
||||
|
@ -303,6 +293,24 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
|
|||
@Override
|
||||
public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
|
||||
processor.subscriberCompleted = true;
|
||||
// A competing write might have completed very quickly
|
||||
if (processor.state.get().equals(State.REQUESTED)) {
|
||||
handleSubscriberCompleted(processor);
|
||||
}
|
||||
}
|
||||
|
||||
private <T> void handleSubscriberCompleted(AbstractListenerWriteFlushProcessor<T> processor) {
|
||||
if (processor.isFlushPending()) {
|
||||
// Ensure the final flush
|
||||
processor.changeState(State.REQUESTED, State.FLUSHING);
|
||||
processor.flushIfPossible();
|
||||
}
|
||||
else if (processor.changeState(State.REQUESTED, State.COMPLETED)) {
|
||||
processor.resultPublisher.publishComplete();
|
||||
}
|
||||
else {
|
||||
processor.state.get().onComplete(processor);
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -376,6 +376,10 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
|
|||
@Override
|
||||
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
|
||||
processor.subscriberCompleted = true;
|
||||
// A competing write might have completed very quickly
|
||||
if (processor.state.get().equals(State.REQUESTED)) {
|
||||
processor.changeStateToComplete(State.REQUESTED);
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
|
@ -383,6 +387,10 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
|
|||
@Override
|
||||
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
|
||||
processor.subscriberCompleted = true;
|
||||
// A competing write might have completed very quickly
|
||||
if (processor.state.get().equals(State.REQUESTED)) {
|
||||
processor.changeStateToComplete(State.REQUESTED);
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
|
|
Loading…
Reference in New Issue