Eliminate need for reactor stream in CompletableFutureUtils + fixes

This commit is contained in:
Stephane Maldini 2015-10-16 09:11:22 +02:00 committed by Sebastien Deleuze
parent 80f9a21b9d
commit 06a1ddbe93
1 changed files with 51 additions and 42 deletions

View File

@ -16,28 +16,28 @@
package org.springframework.reactive.util;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.error.Exceptions;
import reactor.core.error.SpecificationExceptions;
import reactor.core.support.BackpressureUtils;
import reactor.rx.Stream;
import reactor.rx.subscription.ReactiveSubscription;
import org.springframework.util.Assert;
import reactor.Publishers;
import reactor.core.error.CancelException;
import reactor.core.error.Exceptions;
import reactor.core.support.BackpressureUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
/**
* @author Sebastien Deleuze
* @author Stephane Maldini
*/
public class CompletableFutureUtils {
public static <T> Publisher<T> toPublisher(CompletableFuture<T> future) {
return new CompletableFutureStream<T>(future);
return new CompletableFuturePublisher<T>(future);
}
public static <T> CompletableFuture<List<T>> fromPublisher(Publisher<T> publisher) {
@ -97,46 +97,55 @@ public class CompletableFutureUtils {
return future;
}
private static class CompletableFutureStream<T> extends Stream<T> {
private static class CompletableFuturePublisher<T> implements Publisher<T> {
private final CompletableFuture<? extends T> future;
private final Publisher<? extends T> futurePublisher;
public CompletableFutureStream(CompletableFuture<? extends T> future) {
@SuppressWarnings("unused")
private volatile long requested;
private static final AtomicLongFieldUpdater<CompletableFuturePublisher> REQUESTED =
AtomicLongFieldUpdater.newUpdater(CompletableFuturePublisher.class, "requested");
public CompletableFuturePublisher(CompletableFuture<? extends T> future) {
this.future = future;
this.futurePublisher = Publishers.createWithDemand((n, sub) -> {
if (!BackpressureUtils.checkRequest(n, sub)) {
return;
}
if(BackpressureUtils.getAndAdd(REQUESTED, CompletableFuturePublisher.this, n) > 0) {
return;
}
future.whenComplete((result, error) -> {
if (error != null) {
sub.onError(error);
} else {
sub.onNext(result);
sub.onComplete();
}
});
}, null, nothing -> {
if(!future.isDone()){
future.cancel(true);
}
});
}
@Override
public void subscribe(final Subscriber<? super T> subscriber) {
try {
subscriber.onSubscribe(new ReactiveSubscription<T>(this, subscriber) {
@Override
public void request(long elements) {
try{
BackpressureUtils.checkRequest(elements);
}
catch (SpecificationExceptions.Spec309_NullOrNegativeRequest iae) {
subscriber.onError(iae);
return;
}
if (isComplete()) {
return;
}
try {
future.whenComplete((result, error) -> {
if (error != null) {
onError(error);
} else {
subscriber.onNext(result);
onComplete();
}
});
}
catch (Throwable e) {
onError(e);
}
}
});
if (future.isDone()) {
Publishers.just(future.get()).subscribe(subscriber);
}
else if ( future.isCancelled()){
Exceptions.publisher(CancelException.get());
}
else {
futurePublisher.subscribe(subscriber);
}
}
catch (Throwable throwable) {
Exceptions.publisher(throwable);