Improve write completion handling in ChannelSendOperator

Avoid re-using the WriteBarrier as the Subscription to both the
completionSubscriber and the writeSubscriber.

Instead DownstreamBridge is now called WriteCompletionBarrier and is
both a Subscriber and Subscription, i.e. handles all signals to and
from the completion Subscriber.

This frees the request method implementation in WriteBarrier to assume
it can only be called by the writeSubscriber and hence it's safe to
pass on request signals to the write source outside the synchronized
block.
This commit is contained in:
Rossen Stoyanchev 2017-07-12 18:44:55 +02:00
parent 6329ccb327
commit 8359201a0d
1 changed files with 73 additions and 24 deletions

View File

@ -73,16 +73,30 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
@Override
public void subscribe(CoreSubscriber<? super Void> actual) {
this.source.subscribe(new WriteWithBarrier(actual));
this.source.subscribe(new WriteBarrier(actual));
}
/**
* A barrier between the write source and the write subscriber (i.e. the
* HTTP server adapter) that pre-fetches and waits for the first signal
* before deciding whether to hook in to the write subscriber.
*
* <p>Acts as:
* <ul>
* <li>Subscriber to the write source.
* <li>Subscription to the write subscriber.
* <li>Publisher ot the write subscriber.
* </ul>
*
* <p>Also uses {@link WriteCompletionBarrier} for delegating signals to
* and from the write completion subscriber.
*/
@SuppressWarnings("deprecation")
private final class WriteWithBarrier
implements Publisher<T>, CoreSubscriber<T>, Subscription {
private final class WriteBarrier implements CoreSubscriber<T>, Subscription, Publisher<T> {
/* Downstream write completion subscriber */
private final CoreSubscriber<? super Void> completionSubscriber;
/* Bridges signals to and from the completionSubscriber */
private final WriteCompletionBarrier writeCompletionBarrier;
/* Upstream write source subscription */
@Nullable
@ -109,13 +123,13 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
/** Cached 1st/2nd signal before readyToWrite */
private boolean completed = false;
/** The actual writeSubscriber vs the downstream completion subscriber */
/** The actual writeSubscriber from the HTTP server adapter */
@Nullable
private Subscriber<? super T> writeSubscriber;
WriteWithBarrier(CoreSubscriber<? super Void> completionSubscriber) {
this.completionSubscriber = completionSubscriber;
WriteBarrier(CoreSubscriber<? super Void> completionSubscriber) {
this.writeCompletionBarrier = new WriteCompletionBarrier(completionSubscriber, this);
}
@ -125,7 +139,7 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
public final void onSubscribe(Subscription s) {
if (Operators.validate(this.subscription, s)) {
this.subscription = s;
this.completionSubscriber.onSubscribe(this);
this.writeCompletionBarrier.connect();
s.request(1);
}
}
@ -144,13 +158,13 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
else if (this.beforeFirstEmission) {
this.item = item;
this.beforeFirstEmission = false;
writeFunction.apply(this).subscribe(new DownstreamBridge(this.completionSubscriber));
writeFunction.apply(this).subscribe(this.writeCompletionBarrier);
}
else {
if (this.subscription != null) {
this.subscription.cancel();
}
this.completionSubscriber.onError(new IllegalStateException("Unexpected item."));
this.writeCompletionBarrier.onError(new IllegalStateException("Unexpected item."));
}
}
}
@ -172,7 +186,7 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
}
else if (this.beforeFirstEmission) {
this.beforeFirstEmission = false;
this.completionSubscriber.onError(ex);
this.writeCompletionBarrier.onError(ex);
}
else {
this.error = ex;
@ -193,7 +207,7 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
else if (this.beforeFirstEmission) {
this.completed = true;
this.beforeFirstEmission = false;
writeFunction.apply(this).subscribe(new DownstreamBridge(this.completionSubscriber));
writeFunction.apply(this).subscribe(this.writeCompletionBarrier);
}
else {
this.completed = true;
@ -203,11 +217,11 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
@Override
public Context currentContext() {
return this.completionSubscriber.currentContext();
return this.writeCompletionBarrier.currentContext();
}
// Subscription methods (we're the subscription to completion~ and writeSubscriber)..
// Subscription methods (we're the Subscription to the writeSubscriber)..
@Override
public void request(long n) {
@ -229,9 +243,9 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
if (n == 0) {
return;
}
s.request(n);
}
}
s.request(n);
}
private boolean emitCachedSignals() {
@ -259,7 +273,7 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
}
// Publisher<T> methods (we're the Publisher to the write subscriber)...
// Publisher<T> methods (we're the Publisher to the writeSubscriber)..
@Override
public void subscribe(Subscriber<? super T> writeSubscriber) {
@ -278,14 +292,38 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
}
private class DownstreamBridge implements CoreSubscriber<Void> {
/**
* We need an extra barrier between the WriteBarrier and the actual
* completion subscriber.
*
* <p>The actual completionSubscriber is subscribed immediately to the
* WriteBarrier initially. Later after the first signal is received, we need
* this wrapper to subscribe again, this time to the write function.
*/
private class WriteCompletionBarrier implements CoreSubscriber<Void>, Subscription {
private final CoreSubscriber<? super Void> downstream;
/* Downstream write completion subscriber */
private final CoreSubscriber<? super Void> completionSubscriber;
public DownstreamBridge(CoreSubscriber<? super Void> downstream) {
this.downstream = downstream;
private final WriteBarrier writeBarrier;
public WriteCompletionBarrier(CoreSubscriber<? super Void> subscriber, WriteBarrier writeBarrier) {
this.completionSubscriber = subscriber;
this.writeBarrier = writeBarrier;
}
/**
* Connect the underlying completion subscriber to this barrier in order
* to track cancel signals and pass them on to the write barrier.
*/
public void connect() {
this.completionSubscriber.onSubscribe(this);
}
// Subscriber methods (we're the subscriber to the write function)..
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
@ -297,17 +335,28 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
@Override
public void onError(Throwable ex) {
this.downstream.onError(ex);
this.completionSubscriber.onError(ex);
}
@Override
public void onComplete() {
this.downstream.onComplete();
this.completionSubscriber.onComplete();
}
@Override
public Context currentContext() {
return this.downstream.currentContext();
return this.completionSubscriber.currentContext();
}
@Override
public void request(long n) {
// Ignore: we don't produce data
}
@Override
public void cancel() {
this.writeBarrier.cancel();
}
}