From 921bf5fb7076a6836cac1fa7cf421d2d76827899 Mon Sep 17 00:00:00 2001 From: Sebastien Deleuze Date: Thu, 20 Oct 2016 14:02:19 +0200 Subject: [PATCH] Use the official RxJava to Reactive Streams adapter This commit removes the usage of Reactor adapters (about to be moved from Reactor Core to a new Reactor Adapter module). Instead, RxReactiveStreams is now used for adapting RxJava 1 and Flowable methods are used for RxJava 2. Issue: SPR-14824 --- build.gradle | 4 +++ .../core/ReactiveAdapterRegistry.java | 36 ++++++++++--------- .../HttpEntityArgumentResolverTests.java | 13 ++++--- .../RequestBodyArgumentResolverTests.java | 10 +++--- .../reactive/RxNettyHttpHandlerAdapter.java | 4 +-- .../reactive/RxNettyServerHttpRequest.java | 4 +-- .../reactive/RxNettyServerHttpResponse.java | 10 +++--- 7 files changed, 43 insertions(+), 38 deletions(-) diff --git a/build.gradle b/build.gradle index f0f170c5a4..68f79e1995 100644 --- a/build.gradle +++ b/build.gradle @@ -83,6 +83,7 @@ configure(allprojects) { project -> ext.romeVersion = "1.7.0" ext.rxjavaVersion = '1.1.9' ext.rxjava2Version = '2.0.0-RC3' + ext.rxjavaAdapterVersion = '1.2.0' ext.rxnettyVersion = '0.5.2-rc.4' ext.servletVersion = "3.1.0" ext.slf4jVersion = "1.7.21" @@ -393,6 +394,7 @@ project("spring-core") { optional("io.projectreactor:reactor-core:${reactorCoreVersion}") optional "io.reactivex:rxjava:${rxjavaVersion}" optional "io.reactivex.rxjava2:rxjava:${rxjava2Version}" + optional "io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}" optional("io.netty:netty-buffer:${nettyVersion}") testCompile("javax.xml.bind:jaxb-api:${jaxbVersion}") testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}") @@ -742,6 +744,7 @@ project("spring-web") { } optional("io.reactivex:rxjava:${rxjavaVersion}") optional "io.reactivex.rxjava2:rxjava:${rxjava2Version}" + optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}") optional("io.undertow:undertow-core:${undertowVersion}") optional("org.jboss.xnio:xnio-api:${xnioVersion}") optional("io.netty:netty-buffer:${nettyVersion}") // temporarily for JsonObjectDecoder @@ -827,6 +830,7 @@ project("spring-web-reactive") { } testCompile("io.reactivex:rxjava:${rxjavaVersion}") testCompile "io.reactivex.rxjava2:rxjava:${rxjava2Version}" + testCompile("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}") testCompile("io.undertow:undertow-core:${undertowVersion}") testCompile("org.jboss.xnio:xnio-api:${xnioVersion}") testCompile("com.fasterxml:aalto-xml:1.0.0") diff --git a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java index 3feb1f5c40..7be7a41ebf 100644 --- a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java +++ b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java @@ -26,12 +26,11 @@ import java.util.function.Predicate; import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; import org.reactivestreams.Publisher; -import reactor.adapter.RxJava1Adapter; -import reactor.adapter.RxJava2Adapter; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import rx.Completable; import rx.Observable; +import rx.RxReactiveStreams; import rx.Single; import org.springframework.util.ClassUtils; @@ -51,6 +50,9 @@ public class ReactiveAdapterRegistry { private static final boolean rxJava1Present = ClassUtils.isPresent("rx.Observable", ReactiveAdapterRegistry.class.getClassLoader()); + private static final boolean rxJava1Adapter = + ClassUtils.isPresent("rx.RxReactiveStreams", ReactiveAdapterRegistry.class.getClassLoader()); + private static final boolean rxJava2Present = ClassUtils.isPresent("io.reactivex.Flowable", ReactiveAdapterRegistry.class.getClassLoader()); @@ -75,7 +77,7 @@ public class ReactiveAdapterRegistry { new ReactiveAdapter.Descriptor(false, true, false) ); - if (rxJava1Present) { + if (rxJava1Present && rxJava1Adapter) { new RxJava1AdapterRegistrar().register(this); } if (rxJava2Present) { @@ -262,17 +264,17 @@ public class ReactiveAdapterRegistry { public void register(ReactiveAdapterRegistry registry) { registry.registerFluxAdapter(Observable.class, - source -> RxJava1Adapter.observableToFlux((Observable) source), - RxJava1Adapter::publisherToObservable + source -> Flux.from(RxReactiveStreams.toPublisher((Observable) source)), + RxReactiveStreams::toObservable ); registry.registerMonoAdapter(Single.class, - source -> RxJava1Adapter.singleToMono((Single) source), - RxJava1Adapter::publisherToSingle, + source -> Mono.from(RxReactiveStreams.toPublisher((Single) source)), + RxReactiveStreams::toSingle, new ReactiveAdapter.Descriptor(false, false, false) ); registry.registerMonoAdapter(Completable.class, - source -> RxJava1Adapter.completableToMono((Completable) source), - RxJava1Adapter::publisherToCompletable, + source -> Mono.from(RxReactiveStreams.toPublisher((Completable) source)), + RxReactiveStreams::toCompletable, new ReactiveAdapter.Descriptor(false, true, true) ); } @@ -282,21 +284,21 @@ public class ReactiveAdapterRegistry { public void register(ReactiveAdapterRegistry registry) { registry.registerFluxAdapter(Flowable.class, - source -> RxJava2Adapter.flowableToFlux((Flowable) source), - RxJava2Adapter::fluxToFlowable + source -> Flux.from((Flowable) source), + source-> Flowable.fromPublisher(source) ); registry.registerFluxAdapter(io.reactivex.Observable.class, - source -> RxJava2Adapter.observableToFlux((io.reactivex.Observable) source, BackpressureStrategy.BUFFER), - RxJava2Adapter::fluxToObservable + source -> Flux.from(((io.reactivex.Observable) source).toFlowable(BackpressureStrategy.BUFFER)), + source -> Flowable.fromPublisher(source).toObservable() ); registry.registerMonoAdapter(io.reactivex.Single.class, - source -> RxJava2Adapter.singleToMono((io.reactivex.Single) source), - RxJava2Adapter::monoToSingle, + source -> Mono.from(((io.reactivex.Single) source).toFlowable()), + source -> Flowable.fromPublisher(source).toObservable().singleElement().toSingle(), new ReactiveAdapter.Descriptor(false, false, false) ); registry.registerMonoAdapter(io.reactivex.Completable.class, - source -> RxJava2Adapter.completableToMono((io.reactivex.Completable) source), - RxJava2Adapter::monoToCompletable, + source -> Mono.from(((io.reactivex.Completable) source).toFlowable()), + source -> Flowable.fromPublisher(source).toObservable().ignoreElements(), new ReactiveAdapter.Descriptor(false, true, true) ); } diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/HttpEntityArgumentResolverTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/HttpEntityArgumentResolverTests.java index 24ffa019b1..8b067ce37a 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/HttpEntityArgumentResolverTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/HttpEntityArgumentResolverTests.java @@ -26,11 +26,10 @@ import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; import org.junit.Before; import org.junit.Test; -import reactor.adapter.RxJava1Adapter; -import reactor.adapter.RxJava2Adapter; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import rx.Observable; +import rx.RxReactiveStreams; import rx.Single; import org.springframework.core.MethodParameter; @@ -151,7 +150,7 @@ public class HttpEntityArgumentResolverTests { ResolvableType type = httpEntityType(forClassWithGenerics(Single.class, String.class)); HttpEntity> entity = resolveValueWithEmptyBody(type); - TestSubscriber.subscribe(RxJava1Adapter.singleToMono(entity.getBody())) + TestSubscriber.subscribe(RxReactiveStreams.toPublisher(entity.getBody())) .assertNoValues() .assertError(ServerWebInputException.class); } @@ -161,7 +160,7 @@ public class HttpEntityArgumentResolverTests { ResolvableType type = httpEntityType(forClassWithGenerics(io.reactivex.Single.class, String.class)); HttpEntity> entity = resolveValueWithEmptyBody(type); - TestSubscriber.subscribe(RxJava2Adapter.singleToMono(entity.getBody())) + TestSubscriber.subscribe(entity.getBody().toFlowable()) .assertNoValues() .assertError(ServerWebInputException.class); } @@ -171,7 +170,7 @@ public class HttpEntityArgumentResolverTests { ResolvableType type = httpEntityType(forClassWithGenerics(Observable.class, String.class)); HttpEntity> entity = resolveValueWithEmptyBody(type); - TestSubscriber.subscribe(RxJava1Adapter.observableToFlux(entity.getBody())) + TestSubscriber.subscribe(RxReactiveStreams.toPublisher(entity.getBody())) .assertNoError() .assertComplete() .assertNoValues(); @@ -182,7 +181,7 @@ public class HttpEntityArgumentResolverTests { ResolvableType type = httpEntityType(forClassWithGenerics(io.reactivex.Observable.class, String.class)); HttpEntity> entity = resolveValueWithEmptyBody(type); - TestSubscriber.subscribe(RxJava2Adapter.observableToFlux(entity.getBody(), BackpressureStrategy.BUFFER)) + TestSubscriber.subscribe(entity.getBody().toFlowable(BackpressureStrategy.BUFFER)) .assertNoError() .assertComplete() .assertNoValues(); @@ -193,7 +192,7 @@ public class HttpEntityArgumentResolverTests { ResolvableType type = httpEntityType(forClassWithGenerics(Flowable.class, String.class)); HttpEntity> entity = resolveValueWithEmptyBody(type); - TestSubscriber.subscribe(RxJava2Adapter.flowableToFlux(entity.getBody())) + TestSubscriber.subscribe(entity.getBody()) .assertNoError() .assertComplete() .assertNoValues(); diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestBodyArgumentResolverTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestBodyArgumentResolverTests.java index d7e90477f3..e8e00a234f 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestBodyArgumentResolverTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestBodyArgumentResolverTests.java @@ -24,10 +24,10 @@ import java.util.function.Predicate; import org.junit.Before; import org.junit.Test; -import reactor.adapter.RxJava1Adapter; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import rx.Observable; +import rx.RxReactiveStreams; import rx.Single; import org.springframework.core.MethodParameter; @@ -151,12 +151,12 @@ public class RequestBodyArgumentResolverTests { ResolvableType type = forClassWithGenerics(Single.class, String.class); Single single = resolveValueWithEmptyBody(type, true); - TestSubscriber.subscribe(RxJava1Adapter.singleToMono(single)) + TestSubscriber.subscribe(RxReactiveStreams.toPublisher(single)) .assertNoValues() .assertError(ServerWebInputException.class); single = resolveValueWithEmptyBody(type, false); - TestSubscriber.subscribe(RxJava1Adapter.singleToMono(single)) + TestSubscriber.subscribe(RxReactiveStreams.toPublisher(single)) .assertNoValues() .assertError(ServerWebInputException.class); } @@ -166,12 +166,12 @@ public class RequestBodyArgumentResolverTests { ResolvableType type = forClassWithGenerics(Observable.class, String.class); Observable observable = resolveValueWithEmptyBody(type, true); - TestSubscriber.subscribe(RxJava1Adapter.observableToFlux(observable)) + TestSubscriber.subscribe(RxReactiveStreams.toPublisher(observable)) .assertNoValues() .assertError(ServerWebInputException.class); observable = resolveValueWithEmptyBody(type, false); - TestSubscriber.subscribe(RxJava1Adapter.observableToFlux(observable)) + TestSubscriber.subscribe(RxReactiveStreams.toPublisher(observable)) .assertNoValues() .assertComplete(); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyHttpHandlerAdapter.java index af66ca4a73..f275a051aa 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyHttpHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyHttpHandlerAdapter.java @@ -24,9 +24,9 @@ import io.reactivex.netty.protocol.http.server.RequestHandler; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.reactivestreams.Publisher; -import reactor.adapter.RxJava1Adapter; import reactor.core.publisher.Mono; import rx.Observable; +import rx.RxReactiveStreams; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.util.Assert; @@ -64,7 +64,7 @@ public class RxNettyHttpHandlerAdapter implements RequestHandler logger.debug("Successfully completed request")); - return RxJava1Adapter.publisherToObservable(result); + return RxReactiveStreams.toObservable(result); } } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpRequest.java index 660033f1e8..0181a3a815 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpRequest.java @@ -23,9 +23,9 @@ import java.net.URISyntaxException; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.cookie.Cookie; import io.reactivex.netty.protocol.http.server.HttpServerRequest; -import reactor.adapter.RxJava1Adapter; import reactor.core.publisher.Flux; import rx.Observable; +import rx.RxReactiveStreams; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.NettyDataBufferFactory; @@ -106,7 +106,7 @@ public class RxNettyServerHttpRequest extends AbstractServerHttpRequest { @Override public Flux getBody() { Observable content = this.request.getContent().map(dataBufferFactory::wrap); - return RxJava1Adapter.observableToFlux(content); + return Flux.from(RxReactiveStreams.toPublisher(content)); } } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java index e0c04346ee..8135a3af8b 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java @@ -24,10 +24,10 @@ import io.netty.handler.codec.http.cookie.DefaultCookie; import io.reactivex.netty.protocol.http.server.HttpServerResponse; import io.reactivex.netty.protocol.http.server.ResponseContentWriter; import org.reactivestreams.Publisher; -import reactor.adapter.RxJava1Adapter; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import rx.Observable; +import rx.RxReactiveStreams; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.NettyDataBufferFactory; @@ -72,9 +72,9 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse { @Override protected Mono writeWithInternal(Publisher body) { - Observable content = RxJava1Adapter.publisherToObservable(body) + Observable content = RxReactiveStreams.toObservable(body) .map(NettyDataBufferFactory::toByteBuf); - return RxJava1Adapter.observableToFlux(this.response.write(content)) + return Flux.from(RxReactiveStreams.toPublisher(this.response.write(content))) .then(); } @@ -85,9 +85,9 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse { flatMap(publisher -> Flux.from(publisher). map(NettyDataBufferFactory::toByteBuf). concatWith(Mono.just(FLUSH_SIGNAL))); - Observable content = RxJava1Adapter.publisherToObservable(bodyWithFlushSignals); + Observable content = RxReactiveStreams.toObservable(bodyWithFlushSignals); ResponseContentWriter writer = this.response.write(content, bb -> bb == FLUSH_SIGNAL); - return RxJava1Adapter.observableToFlux(writer).then(); + return Flux.from(RxReactiveStreams.toPublisher(writer)).then(); } @Override