use EmptySubscription
This commit is contained in:
parent
fc1b2e96f7
commit
423a4852c5
|
|
@ -22,7 +22,7 @@ import org.reactivestreams.Subscriber;
|
|||
import org.reactivestreams.Subscription;
|
||||
import reactor.core.subscriber.SubscriberBarrier;
|
||||
import reactor.core.util.Assert;
|
||||
|
||||
import reactor.core.util.EmptySubscription;
|
||||
|
||||
/**
|
||||
* Given a write function that accepts a source {@code Publisher<T>} to write
|
||||
|
|
@ -36,10 +36,10 @@ import reactor.core.util.Assert;
|
|||
*/
|
||||
public class WriteWithOperator<T> implements Function<Subscriber<? super Void>, Subscriber<? super T>> {
|
||||
|
||||
private final java.util.function.Function<Publisher<T>, Publisher<Void>> writeFunction;
|
||||
private final Function<Publisher<T>, Publisher<Void>> writeFunction;
|
||||
|
||||
|
||||
public WriteWithOperator(java.util.function.Function<Publisher<T>, Publisher<Void>> writeFunction) {
|
||||
public WriteWithOperator(Function<Publisher<T>, Publisher<Void>> writeFunction) {
|
||||
this.writeFunction = writeFunction;
|
||||
}
|
||||
|
||||
|
|
@ -156,7 +156,7 @@ public class WriteWithOperator<T> implements Function<Subscriber<? super Void>,
|
|||
this.writeSubscriber = writeSubscriber;
|
||||
|
||||
if (this.error != null || this.completed) {
|
||||
this.writeSubscriber.onSubscribe(NO_OP_SUBSCRIPTION);
|
||||
this.writeSubscriber.onSubscribe(EmptySubscription.INSTANCE);
|
||||
emitCachedSignals();
|
||||
}
|
||||
else {
|
||||
|
|
@ -234,15 +234,4 @@ public class WriteWithOperator<T> implements Function<Subscriber<? super Void>,
|
|||
}
|
||||
}
|
||||
|
||||
private final static Subscription NO_OP_SUBSCRIPTION = new Subscription() {
|
||||
|
||||
@Override
|
||||
public void request(long n) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue