Set readyToWrite flag after cached signals emitted

Issue: SPR-16555
This commit is contained in:
Rossen Stoyanchev 2018-03-05 23:20:50 -05:00
parent 6d26e61ac7
commit e48def2d35
1 changed files with 64 additions and 32 deletions

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2017 the original author or authors. * Copyright 2002-2018 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -75,6 +75,33 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
} }
private enum State {
/** No emissions from the upstream source yet */
NEW,
/**
* At least one signal of any kind has been received; we're ready to
* call the write function and proceed with actual writing.
*/
FIRST_SIGNAL_RECEIVED,
/**
* The write subscriber has subscribed and requested; we're going to
* emit the cached signals.
*/
EMITTING_CACHED_SIGNALS,
/**
* The write subscriber has subscribed, and cached signals have been
* emitted to it; we're ready to switch to a simple pass-through mode
* for all remaining signals.
**/
READY_TO_WRITE
}
/** /**
* A barrier inserted between the write source and the write subscriber * A barrier inserted between the write source and the write subscriber
* (i.e. the HTTP server adapter) that pre-fetches and waits for the first * (i.e. the HTTP server adapter) that pre-fetches and waits for the first
@ -99,27 +126,23 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
@Nullable @Nullable
private Subscription subscription; private Subscription subscription;
/** /** Cached data item before readyToWrite */
* We've at at least one emission, we've called the write function, the write
* subscriber has subscribed and cached signals have been emitted to it.
* We're now simply passing data through to the write subscriber.
**/
private boolean readyToWrite = false;
/** No emission from upstream yet */
private boolean beforeFirstEmission = true;
/** Cached signal before readyToWrite */
@Nullable @Nullable
private T item; private T item;
/** Cached 1st/2nd signal before readyToWrite */ /** Cached error signal before readyToWrite */
@Nullable @Nullable
private Throwable error; private Throwable error;
/** Cached 1st/2nd signal before readyToWrite */ /** Cached onComplete signal before readyToWrite */
private boolean completed = false; private boolean completed = false;
/** Recursive demand while emitting cached signals */
private long demandBeforeReadyToWrite;
/** Current state */
private State state = State.NEW;
/** The actual writeSubscriber from the HTTP server adapter */ /** The actual writeSubscriber from the HTTP server adapter */
@Nullable @Nullable
private Subscriber<? super T> writeSubscriber; private Subscriber<? super T> writeSubscriber;
@ -143,18 +166,18 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
@Override @Override
public final void onNext(T item) { public final void onNext(T item) {
if (this.readyToWrite) { if (this.state == State.READY_TO_WRITE) {
requiredWriteSubscriber().onNext(item); requiredWriteSubscriber().onNext(item);
return; return;
} }
//FIXME revisit in case of reentrant sync deadlock //FIXME revisit in case of reentrant sync deadlock
synchronized (this) { synchronized (this) {
if (this.readyToWrite) { if (this.state == State.READY_TO_WRITE) {
requiredWriteSubscriber().onNext(item); requiredWriteSubscriber().onNext(item);
} }
else if (this.beforeFirstEmission) { else if (this.state == State.NEW) {
this.item = item; this.item = item;
this.beforeFirstEmission = false; this.state = State.FIRST_SIGNAL_RECEIVED;
writeFunction.apply(this).subscribe(this.writeCompletionBarrier); writeFunction.apply(this).subscribe(this.writeCompletionBarrier);
} }
else { else {
@ -173,16 +196,16 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
@Override @Override
public final void onError(Throwable ex) { public final void onError(Throwable ex) {
if (this.readyToWrite) { if (this.state == State.READY_TO_WRITE) {
requiredWriteSubscriber().onError(ex); requiredWriteSubscriber().onError(ex);
return; return;
} }
synchronized (this) { synchronized (this) {
if (this.readyToWrite) { if (this.state == State.READY_TO_WRITE) {
requiredWriteSubscriber().onError(ex); requiredWriteSubscriber().onError(ex);
} }
else if (this.beforeFirstEmission) { else if (this.state == State.NEW) {
this.beforeFirstEmission = false; this.state = State.FIRST_SIGNAL_RECEIVED;
this.writeCompletionBarrier.onError(ex); this.writeCompletionBarrier.onError(ex);
} }
else { else {
@ -193,17 +216,17 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
@Override @Override
public final void onComplete() { public final void onComplete() {
if (this.readyToWrite) { if (this.state == State.READY_TO_WRITE) {
requiredWriteSubscriber().onComplete(); requiredWriteSubscriber().onComplete();
return; return;
} }
synchronized (this) { synchronized (this) {
if (this.readyToWrite) { if (this.state == State.READY_TO_WRITE) {
requiredWriteSubscriber().onComplete(); requiredWriteSubscriber().onComplete();
} }
else if (this.beforeFirstEmission) { else if (this.state == State.NEW) {
this.completed = true; this.completed = true;
this.beforeFirstEmission = false; this.state = State.FIRST_SIGNAL_RECEIVED;
writeFunction.apply(this).subscribe(this.writeCompletionBarrier); writeFunction.apply(this).subscribe(this.writeCompletionBarrier);
} }
else { else {
@ -226,19 +249,28 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
if (s == null) { if (s == null) {
return; return;
} }
if (this.readyToWrite) { if (this.state == State.READY_TO_WRITE) {
s.request(n); s.request(n);
return; return;
} }
synchronized (this) { synchronized (this) {
if (this.writeSubscriber != null) { if (this.writeSubscriber != null) {
this.readyToWrite = true; if (this.state == State.EMITTING_CACHED_SIGNALS) {
if (emitCachedSignals()) { this.demandBeforeReadyToWrite = n;
return; return;
} }
n--; try {
if (n == 0) { this.state = State.EMITTING_CACHED_SIGNALS;
return; if (emitCachedSignals()) {
return;
}
n = n + this.demandBeforeReadyToWrite - 1;
if (n == 0) {
return;
}
}
finally {
this.state = State.READY_TO_WRITE;
} }
} }
} }