Update to reactor core 3.1.0.B-S with context update for monoSendOperator

Explicit typing for older jvm compilation in BodyExtractor
This commit is contained in:
Stephane Maldini 2017-06-29 10:46:25 -07:00
parent 67330dfc23
commit cd643704ad
3 changed files with 23 additions and 8 deletions

View File

@ -83,7 +83,7 @@ configure(allprojects) { project ->
ext.poiVersion = "3.16" ext.poiVersion = "3.16"
ext.protobufVersion = "3.3.1" ext.protobufVersion = "3.3.1"
ext.quartzVersion = "2.3.0" ext.quartzVersion = "2.3.0"
ext.reactorVersion = "Bismuth-M2" ext.reactorVersion = "Bismuth-BUILD-SNAPSHOT"
ext.romeVersion = "1.7.3" ext.romeVersion = "1.7.3"
ext.rxjavaVersion = '1.3.0' ext.rxjavaVersion = '1.3.0'
ext.rxjavaAdapterVersion = '1.2.1' ext.rxjavaAdapterVersion = '1.2.1'
@ -176,6 +176,7 @@ configure(allprojects) { project ->
repositories { repositories {
maven { url "https://repo.spring.io/libs-release" } maven { url "https://repo.spring.io/libs-release" }
maven { url "https://repo.spring.io/milestone" } maven { url "https://repo.spring.io/milestone" }
maven { url "https://repo.spring.io/snapshot" }
} }
dependencies { dependencies {

View File

@ -21,9 +21,13 @@ import java.util.function.Function;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import reactor.core.publisher.MonoSource; import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators; import reactor.core.publisher.Operators;
import reactor.util.context.Context;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert; import org.springframework.util.Assert;
/** /**
@ -38,20 +42,30 @@ import org.springframework.util.Assert;
* @author Stephane Maldini * @author Stephane Maldini
* @since 5.0 * @since 5.0
*/ */
public class ChannelSendOperator<T> extends MonoSource<T, Void> { public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
private final Function<Publisher<T>, Publisher<Void>> writeFunction; private final Function<Publisher<T>, Publisher<Void>> writeFunction;
private final Flux<T> source;
public ChannelSendOperator(Publisher<? extends T> source, Function<Publisher<T>, Publisher<Void>> writeFunction) { public ChannelSendOperator(Publisher<? extends T> source, Function<Publisher<T>, Publisher<Void>> writeFunction) {
super(source); this.source = Flux.from(source);
this.writeFunction = writeFunction; this.writeFunction = writeFunction;
} }
@Override
@Nullable
@SuppressWarnings("rawtypes")
public Object scanUnsafe(Attr key) {
if (key == IntAttr.PREFETCH) return Integer.MAX_VALUE;
if (key == ScannableAttr.PARENT) return source;
return null;
}
@Override @Override
public void subscribe(Subscriber<? super Void> s) { public void subscribe(Subscriber<? super Void> s, Context ctx) {
this.source.subscribe(new WriteWithBarrier(s)); this.source.subscribe(new WriteWithBarrier(s), ctx);
} }

View File

@ -93,7 +93,7 @@ public abstract class BodyExtractors {
Assert.notNull(elementType, "'elementType' must not be null"); Assert.notNull(elementType, "'elementType' must not be null");
return (inputMessage, context) -> readWithMessageReaders(inputMessage, context, return (inputMessage, context) -> readWithMessageReaders(inputMessage, context,
elementType, elementType,
reader -> { (HttpMessageReader<T> reader) -> {
Optional<ServerHttpResponse> serverResponse = context.serverResponse(); Optional<ServerHttpResponse> serverResponse = context.serverResponse();
if (serverResponse.isPresent() && inputMessage instanceof ServerHttpRequest) { if (serverResponse.isPresent() && inputMessage instanceof ServerHttpRequest) {
return reader.readMono(elementType, elementType, (ServerHttpRequest) inputMessage, return reader.readMono(elementType, elementType, (ServerHttpRequest) inputMessage,
@ -142,7 +142,7 @@ public abstract class BodyExtractors {
Assert.notNull(elementType, "'elementType' must not be null"); Assert.notNull(elementType, "'elementType' must not be null");
return (inputMessage, context) -> readWithMessageReaders(inputMessage, context, return (inputMessage, context) -> readWithMessageReaders(inputMessage, context,
elementType, elementType,
reader -> { (HttpMessageReader<T> reader) -> {
Optional<ServerHttpResponse> serverResponse = context.serverResponse(); Optional<ServerHttpResponse> serverResponse = context.serverResponse();
if (serverResponse.isPresent() && inputMessage instanceof ServerHttpRequest) { if (serverResponse.isPresent() && inputMessage instanceof ServerHttpRequest) {
return reader.read(elementType, elementType, (ServerHttpRequest) inputMessage, return reader.read(elementType, elementType, (ServerHttpRequest) inputMessage,