Polish ChannelSendOperator

This commit is contained in:
Rossen Stoyanchev 2017-07-12 12:21:04 +02:00
parent b3a05ebac9
commit 42188020b4
1 changed files with 30 additions and 24 deletions

View File

@ -50,7 +50,9 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
private final Flux<T> source; private final Flux<T> source;
public ChannelSendOperator(Publisher<? extends T> source, Function<Publisher<T>, Publisher<Void>> writeFunction) { public ChannelSendOperator(Publisher<? extends T> source, Function<Publisher<T>,
Publisher<Void>> writeFunction) {
this.source = Flux.from(source); this.source = Flux.from(source);
this.writeFunction = writeFunction; this.writeFunction = writeFunction;
} }
@ -79,8 +81,10 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
private final class WriteWithBarrier private final class WriteWithBarrier
implements Publisher<T>, CoreSubscriber<T>, Subscription { implements Publisher<T>, CoreSubscriber<T>, Subscription {
private final CoreSubscriber<? super Void> subscriber; /* Downstream write completion subscriber */
private final CoreSubscriber<? super Void> completionSubscriber;
/* Upstream write source subscription */
@Nullable @Nullable
private Subscription subscription; private Subscription subscription;
@ -109,10 +113,12 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
@Nullable @Nullable
private Subscriber<? super T> writeSubscriber; private Subscriber<? super T> writeSubscriber;
WriteWithBarrier(CoreSubscriber<? super Void> subscriber) {
this.subscriber = subscriber; WriteWithBarrier(CoreSubscriber<? super Void> completionSubscriber) {
this.completionSubscriber = completionSubscriber;
} }
@Override @Override
public void cancel() { public void cancel() {
Subscription s = this.subscription; Subscription s = this.subscription;
@ -124,23 +130,23 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
@Override @Override
public Context currentContext() { public Context currentContext() {
return subscriber.currentContext(); return completionSubscriber.currentContext();
} }
@Override @Override
public final void onComplete() { public final void onComplete() {
if (this.readyToWrite) { if (this.readyToWrite) {
obtainWriteSubscriber().onComplete(); requiredWriteSubscriber().onComplete();
return; return;
} }
synchronized (this) { synchronized (this) {
if (this.readyToWrite) { if (this.readyToWrite) {
obtainWriteSubscriber().onComplete(); requiredWriteSubscriber().onComplete();
} }
else if (this.beforeFirstEmission) { else if (this.beforeFirstEmission) {
this.completed = true; this.completed = true;
this.beforeFirstEmission = false; this.beforeFirstEmission = false;
writeFunction.apply(this).subscribe(new DownstreamBridge(subscriber)); writeFunction.apply(this).subscribe(new DownstreamBridge(this.completionSubscriber));
} }
else { else {
this.completed = true; this.completed = true;
@ -151,16 +157,16 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
@Override @Override
public final void onError(Throwable ex) { public final void onError(Throwable ex) {
if (this.readyToWrite) { if (this.readyToWrite) {
obtainWriteSubscriber().onError(ex); requiredWriteSubscriber().onError(ex);
return; return;
} }
synchronized (this) { synchronized (this) {
if (this.readyToWrite) { if (this.readyToWrite) {
obtainWriteSubscriber().onError(ex); requiredWriteSubscriber().onError(ex);
} }
else if (this.beforeFirstEmission) { else if (this.beforeFirstEmission) {
this.beforeFirstEmission = false; this.beforeFirstEmission = false;
subscriber.onError(ex); this.completionSubscriber.onError(ex);
} }
else { else {
this.error = ex; this.error = ex;
@ -171,24 +177,24 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
@Override @Override
public final void onNext(T item) { public final void onNext(T item) {
if (this.readyToWrite) { if (this.readyToWrite) {
obtainWriteSubscriber().onNext(item); requiredWriteSubscriber().onNext(item);
return; return;
} }
//FIXME revisit in case of reentrant sync deadlock //FIXME revisit in case of reentrant sync deadlock
synchronized (this) { synchronized (this) {
if (this.readyToWrite) { if (this.readyToWrite) {
obtainWriteSubscriber().onNext(item); requiredWriteSubscriber().onNext(item);
} }
else if (this.beforeFirstEmission) { else if (this.beforeFirstEmission) {
this.item = item; this.item = item;
this.beforeFirstEmission = false; this.beforeFirstEmission = false;
writeFunction.apply(this).subscribe(new DownstreamBridge(subscriber)); writeFunction.apply(this).subscribe(new DownstreamBridge(this.completionSubscriber));
} }
else { else {
if (this.subscription != null) { if (this.subscription != null) {
this.subscription.cancel(); this.subscription.cancel();
} }
subscriber.onError(new IllegalStateException("Unexpected item.")); this.completionSubscriber.onError(new IllegalStateException("Unexpected item."));
} }
} }
} }
@ -197,8 +203,8 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
public final void onSubscribe(Subscription s) { public final void onSubscribe(Subscription s) {
if (Operators.validate(this.subscription, s)) { if (Operators.validate(this.subscription, s)) {
this.subscription = s; this.subscription = s;
this.subscriber.onSubscribe(this); this.completionSubscriber.onSubscribe(this);
s.request(1); // bypass doRequest s.request(1);
} }
} }
@ -223,14 +229,14 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
*/ */
private boolean emitCachedSignals() { private boolean emitCachedSignals() {
if (this.item != null) { if (this.item != null) {
obtainWriteSubscriber().onNext(this.item); requiredWriteSubscriber().onNext(this.item);
} }
if (this.error != null) { if (this.error != null) {
obtainWriteSubscriber().onError(this.error); requiredWriteSubscriber().onError(this.error);
return true; return true;
} }
if (this.completed) { if (this.completed) {
obtainWriteSubscriber().onComplete(); requiredWriteSubscriber().onComplete();
return true; return true;
} }
return false; return false;
@ -242,13 +248,13 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
if (s == null) { if (s == null) {
return; return;
} }
if (readyToWrite) { if (this.readyToWrite) {
s.request(n); s.request(n);
return; return;
} }
synchronized (this) { synchronized (this) {
if (this.writeSubscriber != null) { if (this.writeSubscriber != null) {
readyToWrite = true; this.readyToWrite = true;
if (emitCachedSignals()) { if (emitCachedSignals()) {
return; return;
} }
@ -261,7 +267,7 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
s.request(n); s.request(n);
} }
private Subscriber<? super T> obtainWriteSubscriber() { private Subscriber<? super T> requiredWriteSubscriber() {
Assert.state(this.writeSubscriber != null, "No write subscriber"); Assert.state(this.writeSubscriber != null, "No write subscriber");
return this.writeSubscriber; return this.writeSubscriber;
} }
@ -297,7 +303,7 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
@Override @Override
public Context currentContext() { public Context currentContext() {
return downstream.currentContext(); return this.downstream.currentContext();
} }
} }