Fix race condition with onCompletion/onError
Closes gh-23096
This commit is contained in:
parent
8189c90741
commit
dd22b8fd39
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2018 the original author or authors.
|
* Copyright 2002-2019 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -283,19 +283,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
||||||
publisher.subscriber = subscriber;
|
publisher.subscriber = subscriber;
|
||||||
subscriber.onSubscribe(subscription);
|
subscriber.onSubscribe(subscription);
|
||||||
publisher.changeState(SUBSCRIBING, NO_DEMAND);
|
publisher.changeState(SUBSCRIBING, NO_DEMAND);
|
||||||
// Now safe to check "beforeDemand" flags, they won't change once in NO_DEMAND
|
handleCompletionOrErrorBeforeDemand(publisher);
|
||||||
String logPrefix = publisher.getLogPrefix();
|
|
||||||
if (publisher.completionBeforeDemand) {
|
|
||||||
rsReadLogger.trace(logPrefix + "Completed before demand");
|
|
||||||
publisher.state.get().onAllDataRead(publisher);
|
|
||||||
}
|
|
||||||
Throwable ex = publisher.errorBeforeDemand;
|
|
||||||
if (ex != null) {
|
|
||||||
if (rsReadLogger.isTraceEnabled()) {
|
|
||||||
rsReadLogger.trace(logPrefix + "Completed with error before demand: " + ex);
|
|
||||||
}
|
|
||||||
publisher.state.get().onError(publisher, ex);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
throw new IllegalStateException("Failed to transition to SUBSCRIBING, " +
|
throw new IllegalStateException("Failed to transition to SUBSCRIBING, " +
|
||||||
|
@ -306,11 +294,30 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
||||||
@Override
|
@Override
|
||||||
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
|
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
|
||||||
publisher.completionBeforeDemand = true;
|
publisher.completionBeforeDemand = true;
|
||||||
|
handleCompletionOrErrorBeforeDemand(publisher);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
|
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
|
||||||
publisher.errorBeforeDemand = ex;
|
publisher.errorBeforeDemand = ex;
|
||||||
|
handleCompletionOrErrorBeforeDemand(publisher);
|
||||||
|
}
|
||||||
|
|
||||||
|
private <T> void handleCompletionOrErrorBeforeDemand(AbstractListenerReadPublisher<T> publisher) {
|
||||||
|
if (publisher.state.get().equals(NO_DEMAND)) {
|
||||||
|
if (publisher.completionBeforeDemand) {
|
||||||
|
rsReadLogger.trace(publisher.getLogPrefix() + "Completed before demand");
|
||||||
|
publisher.state.get().onAllDataRead(publisher);
|
||||||
|
}
|
||||||
|
Throwable ex = publisher.errorBeforeDemand;
|
||||||
|
if (ex != null) {
|
||||||
|
if (rsReadLogger.isTraceEnabled()) {
|
||||||
|
String prefix = publisher.getLogPrefix();
|
||||||
|
rsReadLogger.trace(prefix + "Completed with error before demand: " + ex);
|
||||||
|
}
|
||||||
|
publisher.state.get().onError(publisher, ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2018 the original author or authors.
|
* Copyright 2002-2019 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -282,17 +282,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
|
||||||
}
|
}
|
||||||
if (processor.changeState(this, REQUESTED)) {
|
if (processor.changeState(this, REQUESTED)) {
|
||||||
if (processor.subscriberCompleted) {
|
if (processor.subscriberCompleted) {
|
||||||
if (processor.isFlushPending()) {
|
handleSubscriberCompleted(processor);
|
||||||
// Ensure the final flush
|
|
||||||
processor.changeState(REQUESTED, FLUSHING);
|
|
||||||
processor.flushIfPossible();
|
|
||||||
}
|
|
||||||
else if (processor.changeState(REQUESTED, COMPLETED)) {
|
|
||||||
processor.resultPublisher.publishComplete();
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
processor.state.get().onComplete(processor);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
Assert.state(processor.subscription != null, "No subscription");
|
Assert.state(processor.subscription != null, "No subscription");
|
||||||
|
@ -303,6 +293,24 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
|
||||||
@Override
|
@Override
|
||||||
public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
|
public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
|
||||||
processor.subscriberCompleted = true;
|
processor.subscriberCompleted = true;
|
||||||
|
// A competing write might have completed very quickly
|
||||||
|
if (processor.state.get().equals(State.REQUESTED)) {
|
||||||
|
handleSubscriberCompleted(processor);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private <T> void handleSubscriberCompleted(AbstractListenerWriteFlushProcessor<T> processor) {
|
||||||
|
if (processor.isFlushPending()) {
|
||||||
|
// Ensure the final flush
|
||||||
|
processor.changeState(State.REQUESTED, State.FLUSHING);
|
||||||
|
processor.flushIfPossible();
|
||||||
|
}
|
||||||
|
else if (processor.changeState(State.REQUESTED, State.COMPLETED)) {
|
||||||
|
processor.resultPublisher.publishComplete();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
processor.state.get().onComplete(processor);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2018 the original author or authors.
|
* Copyright 2002-2019 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -376,6 +376,10 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
|
||||||
@Override
|
@Override
|
||||||
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
|
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
|
||||||
processor.subscriberCompleted = true;
|
processor.subscriberCompleted = true;
|
||||||
|
// A competing write might have completed very quickly
|
||||||
|
if (processor.state.get().equals(State.REQUESTED)) {
|
||||||
|
processor.changeStateToComplete(State.REQUESTED);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
|
@ -383,6 +387,10 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
|
||||||
@Override
|
@Override
|
||||||
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
|
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
|
||||||
processor.subscriberCompleted = true;
|
processor.subscriberCompleted = true;
|
||||||
|
// A competing write might have completed very quickly
|
||||||
|
if (processor.state.get().equals(State.REQUESTED)) {
|
||||||
|
processor.changeStateToComplete(State.REQUESTED);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue