diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java index bf79e543915..42dbf1c200f 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java @@ -21,7 +21,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import org.reactivestreams.Publisher; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.core.io.buffer.DataBuffer; @@ -58,10 +57,8 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { @Override public Mono setBody(Publisher publisher) { - return Flux.from(publisher) - .lift(new WriteWithOperator<>(writePublisher -> - applyBeforeCommit().after(() -> setBodyInternal(writePublisher)))) - .after(); + return new WriteWithOperator<>(publisher, writePublisher -> + applyBeforeCommit().after(() -> setBodyInternal(writePublisher))); } private Mono applyBeforeCommit() { diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/WriteWithOperator.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/WriteWithOperator.java index 72cd43cb57d..44d15830c8f 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/WriteWithOperator.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/WriteWithOperator.java @@ -20,10 +20,12 @@ import java.util.function.Function; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import reactor.core.publisher.MonoSource; import reactor.core.subscriber.SubscriberBarrier; -import reactor.core.util.Assert; import reactor.core.util.EmptySubscription; +import org.springframework.util.Assert; + /** * Given a write function that accepts a source {@code Publisher} to write * with and returns {@code Publisher} for the result, this operator helps @@ -33,22 +35,24 @@ import reactor.core.util.EmptySubscription; * through the result publisher. Otherwise the write function is invoked. * * @author Rossen Stoyanchev + * @author Stephane Maldini */ -public class WriteWithOperator implements Function, Subscriber> { +public class WriteWithOperator extends MonoSource { private final Function, Publisher> writeFunction; - public WriteWithOperator(Function, Publisher> writeFunction) { + public WriteWithOperator(Publisher source, + Function, Publisher> writeFunction) { + super(source); this.writeFunction = writeFunction; } @Override - public Subscriber apply(Subscriber subscriber) { - return new WriteWithBarrier(subscriber); + public void subscribe(Subscriber s) { + source.subscribe(new WriteWithBarrier(s)); } - private class WriteWithBarrier extends SubscriberBarrier implements Publisher { /** diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/ResponseBodyResultHandler.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/ResponseBodyResultHandler.java index 8c0b57d9694..0e5af148dd1 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/ResponseBodyResultHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/ResponseBodyResultHandler.java @@ -145,7 +145,7 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered publisher = this.conversionService.convert(value.get(), Publisher.class); elementType = returnType.getGeneric(0); if (Void.class.equals(elementType.getRawClass())) { - return (Mono)Mono.from(publisher); + return Mono.from((Publisher)publisher); } } else { diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/WriteWithOperatorTests.java b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/WriteWithOperatorTests.java index 56a2e02655f..9f6d8f96028 100644 --- a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/WriteWithOperatorTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/WriteWithOperatorTests.java @@ -28,6 +28,7 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.core.subscriber.SubscriberBarrier; import reactor.rx.Fluxion; import reactor.rx.Signal; @@ -36,25 +37,27 @@ import static org.junit.Assert.*; /** * @author Rossen Stoyanchev + * @author Stephane Maldini */ @SuppressWarnings("ThrowableResultOfMethodCallIgnored") public class WriteWithOperatorTests { private OneByOneAsyncWriter writer; - private WriteWithOperator operator; - @Before public void setUp() throws Exception { this.writer = new OneByOneAsyncWriter(); - this.operator = new WriteWithOperator<>(this.writer::writeWith); + } + + private Mono writeWithOperator(Publisher source){ + return new WriteWithOperator<>(source, writer::writeWith); } @Test public void errorBeforeFirstItem() throws Exception { IllegalStateException error = new IllegalStateException("boo"); - Publisher completion = Flux.error(error).lift(this.operator); + Mono completion = Mono.error(error).as(this::writeWithOperator); List> signals = Fluxion.from(completion).materialize().toList().get(); assertEquals(1, signals.size()); @@ -63,7 +66,7 @@ public class WriteWithOperatorTests { @Test public void completionBeforeFirstItem() throws Exception { - Publisher completion = Flux.empty().lift(this.operator); + Mono completion = Flux.empty().as(this::writeWithOperator); List> signals = Fluxion.from(completion).materialize().toList().get(); assertEquals(1, signals.size()); @@ -75,8 +78,8 @@ public class WriteWithOperatorTests { @Test public void writeOneItem() throws Exception { - Publisher completion = Flux.just("one").lift(this.operator); - List> signals = Fluxion.from(completion).materialize().toList().get(); + Mono completion = Flux.just("one").as(this::writeWithOperator); + List> signals =completion.as(Fluxion::from).materialize().toList().get(); assertEquals(1, signals.size()); assertTrue("Unexpected signal: " + signals.get(0), signals.get(0).isOnComplete()); @@ -90,8 +93,8 @@ public class WriteWithOperatorTests { @Test public void writeMultipleItems() throws Exception { List items = Arrays.asList("one", "two", "three"); - Publisher completion = Flux.fromIterable(items).lift(this.operator); - List> signals = Fluxion.from(completion).materialize().toList().get(); + Mono completion = Flux.fromIterable(items).as(this::writeWithOperator); + List> signals = completion.as(Fluxion::from).materialize().toList().get(); assertEquals(1, signals.size()); assertTrue("Unexpected signal: " + signals.get(0), signals.get(0).isOnComplete()); @@ -113,8 +116,8 @@ public class WriteWithOperatorTests { subscriber.onError(error); } }, subscriber -> new AtomicInteger()); - Publisher completion = publisher.lift(this.operator); - List> signals = Fluxion.from(completion).materialize().toList().get(); + Mono completion = publisher.as(this::writeWithOperator); + List> signals = completion.as(Fluxion::from).materialize().toList().get(); assertEquals(1, signals.size()); assertSame("Unexpected signal: " + signals.get(0), error, signals.get(0).getThrowable());