Use the official RxJava to Reactive Streams adapter

This commit removes the usage of Reactor adapters (about to
be moved from Reactor Core to a new Reactor Adapter module).
Instead, RxReactiveStreams is now used for adapting RxJava
1 and Flowable methods are used for RxJava 2.

Issue: SPR-14824
This commit is contained in:
Sebastien Deleuze 2016-10-20 14:02:19 +02:00
parent 7e907c1e83
commit 921bf5fb70
7 changed files with 43 additions and 38 deletions

View File

@ -83,6 +83,7 @@ configure(allprojects) { project ->
ext.romeVersion = "1.7.0"
ext.rxjavaVersion = '1.1.9'
ext.rxjava2Version = '2.0.0-RC3'
ext.rxjavaAdapterVersion = '1.2.0'
ext.rxnettyVersion = '0.5.2-rc.4'
ext.servletVersion = "3.1.0"
ext.slf4jVersion = "1.7.21"
@ -393,6 +394,7 @@ project("spring-core") {
optional("io.projectreactor:reactor-core:${reactorCoreVersion}")
optional "io.reactivex:rxjava:${rxjavaVersion}"
optional "io.reactivex.rxjava2:rxjava:${rxjava2Version}"
optional "io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}"
optional("io.netty:netty-buffer:${nettyVersion}")
testCompile("javax.xml.bind:jaxb-api:${jaxbVersion}")
testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}")
@ -742,6 +744,7 @@ project("spring-web") {
}
optional("io.reactivex:rxjava:${rxjavaVersion}")
optional "io.reactivex.rxjava2:rxjava:${rxjava2Version}"
optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}")
optional("io.undertow:undertow-core:${undertowVersion}")
optional("org.jboss.xnio:xnio-api:${xnioVersion}")
optional("io.netty:netty-buffer:${nettyVersion}") // temporarily for JsonObjectDecoder
@ -827,6 +830,7 @@ project("spring-web-reactive") {
}
testCompile("io.reactivex:rxjava:${rxjavaVersion}")
testCompile "io.reactivex.rxjava2:rxjava:${rxjava2Version}"
testCompile("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}")
testCompile("io.undertow:undertow-core:${undertowVersion}")
testCompile("org.jboss.xnio:xnio-api:${xnioVersion}")
testCompile("com.fasterxml:aalto-xml:1.0.0")

View File

@ -26,12 +26,11 @@ import java.util.function.Predicate;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import org.reactivestreams.Publisher;
import reactor.adapter.RxJava1Adapter;
import reactor.adapter.RxJava2Adapter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Completable;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.Single;
import org.springframework.util.ClassUtils;
@ -51,6 +50,9 @@ public class ReactiveAdapterRegistry {
private static final boolean rxJava1Present =
ClassUtils.isPresent("rx.Observable", ReactiveAdapterRegistry.class.getClassLoader());
private static final boolean rxJava1Adapter =
ClassUtils.isPresent("rx.RxReactiveStreams", ReactiveAdapterRegistry.class.getClassLoader());
private static final boolean rxJava2Present =
ClassUtils.isPresent("io.reactivex.Flowable", ReactiveAdapterRegistry.class.getClassLoader());
@ -75,7 +77,7 @@ public class ReactiveAdapterRegistry {
new ReactiveAdapter.Descriptor(false, true, false)
);
if (rxJava1Present) {
if (rxJava1Present && rxJava1Adapter) {
new RxJava1AdapterRegistrar().register(this);
}
if (rxJava2Present) {
@ -262,17 +264,17 @@ public class ReactiveAdapterRegistry {
public void register(ReactiveAdapterRegistry registry) {
registry.registerFluxAdapter(Observable.class,
source -> RxJava1Adapter.observableToFlux((Observable<?>) source),
RxJava1Adapter::publisherToObservable
source -> Flux.from(RxReactiveStreams.toPublisher((Observable<?>) source)),
RxReactiveStreams::toObservable
);
registry.registerMonoAdapter(Single.class,
source -> RxJava1Adapter.singleToMono((Single<?>) source),
RxJava1Adapter::publisherToSingle,
source -> Mono.from(RxReactiveStreams.toPublisher((Single<?>) source)),
RxReactiveStreams::toSingle,
new ReactiveAdapter.Descriptor(false, false, false)
);
registry.registerMonoAdapter(Completable.class,
source -> RxJava1Adapter.completableToMono((Completable) source),
RxJava1Adapter::publisherToCompletable,
source -> Mono.from(RxReactiveStreams.toPublisher((Completable) source)),
RxReactiveStreams::toCompletable,
new ReactiveAdapter.Descriptor(false, true, true)
);
}
@ -282,21 +284,21 @@ public class ReactiveAdapterRegistry {
public void register(ReactiveAdapterRegistry registry) {
registry.registerFluxAdapter(Flowable.class,
source -> RxJava2Adapter.flowableToFlux((Flowable<?>) source),
RxJava2Adapter::fluxToFlowable
source -> Flux.from((Flowable<?>) source),
source-> Flowable.fromPublisher(source)
);
registry.registerFluxAdapter(io.reactivex.Observable.class,
source -> RxJava2Adapter.observableToFlux((io.reactivex.Observable<?>) source, BackpressureStrategy.BUFFER),
RxJava2Adapter::fluxToObservable
source -> Flux.from(((io.reactivex.Observable<?>) source).toFlowable(BackpressureStrategy.BUFFER)),
source -> Flowable.fromPublisher(source).toObservable()
);
registry.registerMonoAdapter(io.reactivex.Single.class,
source -> RxJava2Adapter.singleToMono((io.reactivex.Single<?>) source),
RxJava2Adapter::monoToSingle,
source -> Mono.from(((io.reactivex.Single<?>) source).toFlowable()),
source -> Flowable.fromPublisher(source).toObservable().singleElement().toSingle(),
new ReactiveAdapter.Descriptor(false, false, false)
);
registry.registerMonoAdapter(io.reactivex.Completable.class,
source -> RxJava2Adapter.completableToMono((io.reactivex.Completable) source),
RxJava2Adapter::monoToCompletable,
source -> Mono.from(((io.reactivex.Completable) source).toFlowable()),
source -> Flowable.fromPublisher(source).toObservable().ignoreElements(),
new ReactiveAdapter.Descriptor(false, true, true)
);
}

View File

@ -26,11 +26,10 @@ import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import org.junit.Before;
import org.junit.Test;
import reactor.adapter.RxJava1Adapter;
import reactor.adapter.RxJava2Adapter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.Single;
import org.springframework.core.MethodParameter;
@ -151,7 +150,7 @@ public class HttpEntityArgumentResolverTests {
ResolvableType type = httpEntityType(forClassWithGenerics(Single.class, String.class));
HttpEntity<Single<String>> entity = resolveValueWithEmptyBody(type);
TestSubscriber.subscribe(RxJava1Adapter.singleToMono(entity.getBody()))
TestSubscriber.subscribe(RxReactiveStreams.toPublisher(entity.getBody()))
.assertNoValues()
.assertError(ServerWebInputException.class);
}
@ -161,7 +160,7 @@ public class HttpEntityArgumentResolverTests {
ResolvableType type = httpEntityType(forClassWithGenerics(io.reactivex.Single.class, String.class));
HttpEntity<io.reactivex.Single<String>> entity = resolveValueWithEmptyBody(type);
TestSubscriber.subscribe(RxJava2Adapter.singleToMono(entity.getBody()))
TestSubscriber.subscribe(entity.getBody().toFlowable())
.assertNoValues()
.assertError(ServerWebInputException.class);
}
@ -171,7 +170,7 @@ public class HttpEntityArgumentResolverTests {
ResolvableType type = httpEntityType(forClassWithGenerics(Observable.class, String.class));
HttpEntity<Observable<String>> entity = resolveValueWithEmptyBody(type);
TestSubscriber.subscribe(RxJava1Adapter.observableToFlux(entity.getBody()))
TestSubscriber.subscribe(RxReactiveStreams.toPublisher(entity.getBody()))
.assertNoError()
.assertComplete()
.assertNoValues();
@ -182,7 +181,7 @@ public class HttpEntityArgumentResolverTests {
ResolvableType type = httpEntityType(forClassWithGenerics(io.reactivex.Observable.class, String.class));
HttpEntity<io.reactivex.Observable<String>> entity = resolveValueWithEmptyBody(type);
TestSubscriber.subscribe(RxJava2Adapter.observableToFlux(entity.getBody(), BackpressureStrategy.BUFFER))
TestSubscriber.subscribe(entity.getBody().toFlowable(BackpressureStrategy.BUFFER))
.assertNoError()
.assertComplete()
.assertNoValues();
@ -193,7 +192,7 @@ public class HttpEntityArgumentResolverTests {
ResolvableType type = httpEntityType(forClassWithGenerics(Flowable.class, String.class));
HttpEntity<Flowable<String>> entity = resolveValueWithEmptyBody(type);
TestSubscriber.subscribe(RxJava2Adapter.flowableToFlux(entity.getBody()))
TestSubscriber.subscribe(entity.getBody())
.assertNoError()
.assertComplete()
.assertNoValues();

View File

@ -24,10 +24,10 @@ import java.util.function.Predicate;
import org.junit.Before;
import org.junit.Test;
import reactor.adapter.RxJava1Adapter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.Single;
import org.springframework.core.MethodParameter;
@ -151,12 +151,12 @@ public class RequestBodyArgumentResolverTests {
ResolvableType type = forClassWithGenerics(Single.class, String.class);
Single<String> single = resolveValueWithEmptyBody(type, true);
TestSubscriber.subscribe(RxJava1Adapter.singleToMono(single))
TestSubscriber.subscribe(RxReactiveStreams.toPublisher(single))
.assertNoValues()
.assertError(ServerWebInputException.class);
single = resolveValueWithEmptyBody(type, false);
TestSubscriber.subscribe(RxJava1Adapter.singleToMono(single))
TestSubscriber.subscribe(RxReactiveStreams.toPublisher(single))
.assertNoValues()
.assertError(ServerWebInputException.class);
}
@ -166,12 +166,12 @@ public class RequestBodyArgumentResolverTests {
ResolvableType type = forClassWithGenerics(Observable.class, String.class);
Observable<String> observable = resolveValueWithEmptyBody(type, true);
TestSubscriber.subscribe(RxJava1Adapter.observableToFlux(observable))
TestSubscriber.subscribe(RxReactiveStreams.toPublisher(observable))
.assertNoValues()
.assertError(ServerWebInputException.class);
observable = resolveValueWithEmptyBody(type, false);
TestSubscriber.subscribe(RxJava1Adapter.observableToFlux(observable))
TestSubscriber.subscribe(RxReactiveStreams.toPublisher(observable))
.assertNoValues()
.assertComplete();
}

View File

@ -24,9 +24,9 @@ import io.reactivex.netty.protocol.http.server.RequestHandler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.adapter.RxJava1Adapter;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.RxReactiveStreams;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.util.Assert;
@ -64,7 +64,7 @@ public class RxNettyHttpHandlerAdapter implements RequestHandler<ByteBuf, ByteBu
})
.doOnSuccess(aVoid -> logger.debug("Successfully completed request"));
return RxJava1Adapter.publisherToObservable(result);
return RxReactiveStreams.toObservable(result);
}
}

View File

@ -23,9 +23,9 @@ import java.net.URISyntaxException;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.cookie.Cookie;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import reactor.adapter.RxJava1Adapter;
import reactor.core.publisher.Flux;
import rx.Observable;
import rx.RxReactiveStreams;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
@ -106,7 +106,7 @@ public class RxNettyServerHttpRequest extends AbstractServerHttpRequest {
@Override
public Flux<DataBuffer> getBody() {
Observable<DataBuffer> content = this.request.getContent().map(dataBufferFactory::wrap);
return RxJava1Adapter.observableToFlux(content);
return Flux.from(RxReactiveStreams.toPublisher(content));
}
}

View File

@ -24,10 +24,10 @@ import io.netty.handler.codec.http.cookie.DefaultCookie;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.ResponseContentWriter;
import org.reactivestreams.Publisher;
import reactor.adapter.RxJava1Adapter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.RxReactiveStreams;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
@ -72,9 +72,9 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse {
@Override
protected Mono<Void> writeWithInternal(Publisher<DataBuffer> body) {
Observable<ByteBuf> content = RxJava1Adapter.publisherToObservable(body)
Observable<ByteBuf> content = RxReactiveStreams.toObservable(body)
.map(NettyDataBufferFactory::toByteBuf);
return RxJava1Adapter.observableToFlux(this.response.write(content))
return Flux.from(RxReactiveStreams.toPublisher(this.response.write(content)))
.then();
}
@ -85,9 +85,9 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse {
flatMap(publisher -> Flux.from(publisher).
map(NettyDataBufferFactory::toByteBuf).
concatWith(Mono.just(FLUSH_SIGNAL)));
Observable<ByteBuf> content = RxJava1Adapter.publisherToObservable(bodyWithFlushSignals);
Observable<ByteBuf> content = RxReactiveStreams.toObservable(bodyWithFlushSignals);
ResponseContentWriter<ByteBuf> writer = this.response.write(content, bb -> bb == FLUSH_SIGNAL);
return RxJava1Adapter.observableToFlux(writer).then();
return Flux.from(RxReactiveStreams.toPublisher(writer)).then();
}
@Override