Arrange WriteWithOperator to save some allocation cost with the same technique used by RSC and Reactor

This commit is contained in:
Stephane Maldini 2016-02-28 11:49:00 +00:00
parent 423a4852c5
commit 4197f002d8
4 changed files with 27 additions and 23 deletions

View File

@ -21,7 +21,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
@ -58,10 +57,8 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
@Override
public Mono<Void> setBody(Publisher<DataBuffer> publisher) {
return Flux.from(publisher)
.lift(new WriteWithOperator<>(writePublisher ->
applyBeforeCommit().after(() -> setBodyInternal(writePublisher))))
.after();
return new WriteWithOperator<>(publisher, writePublisher ->
applyBeforeCommit().after(() -> setBodyInternal(writePublisher)));
}
private Mono<Void> applyBeforeCommit() {

View File

@ -20,10 +20,12 @@ import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.MonoSource;
import reactor.core.subscriber.SubscriberBarrier;
import reactor.core.util.Assert;
import reactor.core.util.EmptySubscription;
import org.springframework.util.Assert;
/**
* Given a write function that accepts a source {@code Publisher<T>} to write
* with and returns {@code Publisher<Void>} for the result, this operator helps
@ -33,22 +35,24 @@ import reactor.core.util.EmptySubscription;
* through the result publisher. Otherwise the write function is invoked.
*
* @author Rossen Stoyanchev
* @author Stephane Maldini
*/
public class WriteWithOperator<T> implements Function<Subscriber<? super Void>, Subscriber<? super T>> {
public class WriteWithOperator<T> extends MonoSource<T, Void> {
private final Function<Publisher<T>, Publisher<Void>> writeFunction;
public WriteWithOperator(Function<Publisher<T>, Publisher<Void>> writeFunction) {
public WriteWithOperator(Publisher<? extends T> source,
Function<Publisher<T>, Publisher<Void>> writeFunction) {
super(source);
this.writeFunction = writeFunction;
}
@Override
public Subscriber<? super T> apply(Subscriber<? super Void> subscriber) {
return new WriteWithBarrier(subscriber);
public void subscribe(Subscriber<? super Void> s) {
source.subscribe(new WriteWithBarrier(s));
}
private class WriteWithBarrier extends SubscriberBarrier<T, Void> implements Publisher<T> {
/**

View File

@ -145,7 +145,7 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered
publisher = this.conversionService.convert(value.get(), Publisher.class);
elementType = returnType.getGeneric(0);
if (Void.class.equals(elementType.getRawClass())) {
return (Mono<Void>)Mono.from(publisher);
return Mono.from((Publisher<Void>)publisher);
}
}
else {

View File

@ -28,6 +28,7 @@ import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.subscriber.SubscriberBarrier;
import reactor.rx.Fluxion;
import reactor.rx.Signal;
@ -36,25 +37,27 @@ import static org.junit.Assert.*;
/**
* @author Rossen Stoyanchev
* @author Stephane Maldini
*/
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public class WriteWithOperatorTests {
private OneByOneAsyncWriter writer;
private WriteWithOperator<String> operator;
@Before
public void setUp() throws Exception {
this.writer = new OneByOneAsyncWriter();
this.operator = new WriteWithOperator<>(this.writer::writeWith);
}
private <T> Mono<Void> writeWithOperator(Publisher<String> source){
return new WriteWithOperator<>(source, writer::writeWith);
}
@Test
public void errorBeforeFirstItem() throws Exception {
IllegalStateException error = new IllegalStateException("boo");
Publisher<Void> completion = Flux.<String>error(error).lift(this.operator);
Mono<Void> completion = Mono.<String>error(error).as(this::writeWithOperator);
List<Signal<Void>> signals = Fluxion.from(completion).materialize().toList().get();
assertEquals(1, signals.size());
@ -63,7 +66,7 @@ public class WriteWithOperatorTests {
@Test
public void completionBeforeFirstItem() throws Exception {
Publisher<Void> completion = Flux.<String>empty().lift(this.operator);
Mono<Void> completion = Flux.<String>empty().as(this::writeWithOperator);
List<Signal<Void>> signals = Fluxion.from(completion).materialize().toList().get();
assertEquals(1, signals.size());
@ -75,8 +78,8 @@ public class WriteWithOperatorTests {
@Test
public void writeOneItem() throws Exception {
Publisher<Void> completion = Flux.just("one").lift(this.operator);
List<Signal<Void>> signals = Fluxion.from(completion).materialize().toList().get();
Mono<Void> completion = Flux.just("one").as(this::writeWithOperator);
List<Signal<Void>> signals =completion.as(Fluxion::from).materialize().toList().get();
assertEquals(1, signals.size());
assertTrue("Unexpected signal: " + signals.get(0), signals.get(0).isOnComplete());
@ -90,8 +93,8 @@ public class WriteWithOperatorTests {
@Test
public void writeMultipleItems() throws Exception {
List<String> items = Arrays.asList("one", "two", "three");
Publisher<Void> completion = Flux.fromIterable(items).lift(this.operator);
List<Signal<Void>> signals = Fluxion.from(completion).materialize().toList().get();
Mono<Void> completion = Flux.fromIterable(items).as(this::writeWithOperator);
List<Signal<Void>> signals = completion.as(Fluxion::from).materialize().toList().get();
assertEquals(1, signals.size());
assertTrue("Unexpected signal: " + signals.get(0), signals.get(0).isOnComplete());
@ -113,8 +116,8 @@ public class WriteWithOperatorTests {
subscriber.onError(error);
}
}, subscriber -> new AtomicInteger());
Publisher<Void> completion = publisher.lift(this.operator);
List<Signal<Void>> signals = Fluxion.from(completion).materialize().toList().get();
Mono<Void> completion = publisher.as(this::writeWithOperator);
List<Signal<Void>> signals = completion.as(Fluxion::from).materialize().toList().get();
assertEquals(1, signals.size());
assertSame("Unexpected signal: " + signals.get(0), error, signals.get(0).getThrowable());