Equalize copy of ChannelSendOperator

See gh-23175
This commit is contained in:
Rossen Stoyanchev 2019-06-21 14:16:44 +01:00
parent 030caea9cf
commit 003247dc40
1 changed files with 9 additions and 1 deletions

View File

@ -246,7 +246,15 @@ class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
else if (this.state == State.NEW) {
this.completed = true;
this.state = State.FIRST_SIGNAL_RECEIVED;
writeFunction.apply(this).subscribe(this.writeCompletionBarrier);
Publisher<Void> result;
try {
result = writeFunction.apply(this);
}
catch (Throwable ex) {
this.writeCompletionBarrier.onError(ex);
return;
}
result.subscribe(this.writeCompletionBarrier);
}
else {
this.completed = true;