From cd643704adf1cb3787736452b3c7a96897ecde1c Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Thu, 29 Jun 2017 10:46:25 -0700 Subject: [PATCH] Update to reactor core 3.1.0.B-S with context update for monoSendOperator Explicit typing for older jvm compilation in BodyExtractor --- build.gradle | 3 ++- .../server/reactive/ChannelSendOperator.java | 24 +++++++++++++++---- .../web/reactive/function/BodyExtractors.java | 4 ++-- 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/build.gradle b/build.gradle index 21c18403b7..0fde270625 100644 --- a/build.gradle +++ b/build.gradle @@ -83,7 +83,7 @@ configure(allprojects) { project -> ext.poiVersion = "3.16" ext.protobufVersion = "3.3.1" ext.quartzVersion = "2.3.0" - ext.reactorVersion = "Bismuth-M2" + ext.reactorVersion = "Bismuth-BUILD-SNAPSHOT" ext.romeVersion = "1.7.3" ext.rxjavaVersion = '1.3.0' ext.rxjavaAdapterVersion = '1.2.1' @@ -176,6 +176,7 @@ configure(allprojects) { project -> repositories { maven { url "https://repo.spring.io/libs-release" } maven { url "https://repo.spring.io/milestone" } + maven { url "https://repo.spring.io/snapshot" } } dependencies { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java index afdddb76f9..3ae420f8b1 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java @@ -21,9 +21,13 @@ import java.util.function.Function; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import reactor.core.publisher.MonoSource; +import reactor.core.Scannable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; +import reactor.util.context.Context; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -38,20 +42,30 @@ import org.springframework.util.Assert; * @author Stephane Maldini * @since 5.0 */ -public class ChannelSendOperator extends MonoSource { +public class ChannelSendOperator extends Mono implements Scannable { private final Function, Publisher> writeFunction; + private final Flux source; public ChannelSendOperator(Publisher source, Function, Publisher> writeFunction) { - super(source); + this.source = Flux.from(source); this.writeFunction = writeFunction; } + @Override + @Nullable + @SuppressWarnings("rawtypes") + public Object scanUnsafe(Attr key) { + if (key == IntAttr.PREFETCH) return Integer.MAX_VALUE; + if (key == ScannableAttr.PARENT) return source; + return null; + } + @Override - public void subscribe(Subscriber s) { - this.source.subscribe(new WriteWithBarrier(s)); + public void subscribe(Subscriber s, Context ctx) { + this.source.subscribe(new WriteWithBarrier(s), ctx); } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyExtractors.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyExtractors.java index 0901a50452..acb0dab440 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyExtractors.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyExtractors.java @@ -93,7 +93,7 @@ public abstract class BodyExtractors { Assert.notNull(elementType, "'elementType' must not be null"); return (inputMessage, context) -> readWithMessageReaders(inputMessage, context, elementType, - reader -> { + (HttpMessageReader reader) -> { Optional serverResponse = context.serverResponse(); if (serverResponse.isPresent() && inputMessage instanceof ServerHttpRequest) { return reader.readMono(elementType, elementType, (ServerHttpRequest) inputMessage, @@ -142,7 +142,7 @@ public abstract class BodyExtractors { Assert.notNull(elementType, "'elementType' must not be null"); return (inputMessage, context) -> readWithMessageReaders(inputMessage, context, elementType, - reader -> { + (HttpMessageReader reader) -> { Optional serverResponse = context.serverResponse(); if (serverResponse.isPresent() && inputMessage instanceof ServerHttpRequest) { return reader.read(elementType, elementType, (ServerHttpRequest) inputMessage,