Deprecate RxJava 2 in favor of RxJava 3

Closes gh-27474
This commit is contained in:
Juergen Hoeller 2021-09-27 16:56:28 +02:00
parent 9ff0d717f2
commit b0c424b376
3 changed files with 191 additions and 207 deletions

View File

@ -44,12 +44,11 @@ import org.springframework.util.ReflectionUtils;
* {@code Observable}, and others.
*
* <p>By default, depending on classpath availability, adapters are registered
* for Reactor, RxJava 2/3, or RxJava 1 (+ RxJava Reactive Streams bridge),
* {@link CompletableFuture}, Java 9+ {@code Flow.Publisher}, and Kotlin
* Coroutines' {@code Deferred} and {@code Flow}.
* for Reactor, RxJava 3, {@link CompletableFuture}, {@code Flow.Publisher},
* and Kotlin Coroutines' {@code Deferred} and {@code Flow}.
*
* <p><strong>Note:</strong> As of Spring Framework 5.3, support for RxJava 1.x
* is deprecated in favor of RxJava 2 and 3.
* <p><strong>Note:</strong> As of Spring Framework 5.3.11, support for
* RxJava 1.x and 2.x is deprecated in favor of RxJava 3.
*
* @author Rossen Stoyanchev
* @author Sebastien Deleuze
@ -103,15 +102,13 @@ public class ReactiveAdapterRegistry {
}
}
// RxJava1 (deprecated)
// RxJava
if (rxjava1Present) {
new RxJava1Registrar().registerAdapters(this);
}
// RxJava2
if (rxjava2Present) {
new RxJava2Registrar().registerAdapters(this);
}
// RxJava3
if (rxjava3Present) {
new RxJava3Registrar().registerAdapters(this);
}
@ -219,161 +216,6 @@ public class ReactiveAdapterRegistry {
}
private static class ReactorRegistrar {
void registerAdapters(ReactiveAdapterRegistry registry) {
// Register Flux and Mono before Publisher...
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty),
source -> (Mono<?>) source,
Mono::from
);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty),
source -> (Flux<?>) source,
Flux::from);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty),
source -> (Publisher<?>) source,
source -> source);
registry.registerReactiveType(
ReactiveTypeDescriptor.nonDeferredAsyncValue(CompletionStage.class, EmptyCompletableFuture::new),
source -> Mono.fromCompletionStage((CompletionStage<?>) source),
source -> Mono.from(source).toFuture()
);
}
}
private static class ReactorJdkFlowAdapterRegistrar {
void registerAdapter(ReactiveAdapterRegistry registry) {
// Reflectively access optional JDK 9+ API (for runtime compatibility with JDK 8)
try {
String publisherName = "java.util.concurrent.Flow.Publisher";
Class<?> publisherClass = ClassUtils.forName(publisherName, getClass().getClassLoader());
String adapterName = "reactor.adapter.JdkFlowAdapter";
Class<?> flowAdapterClass = ClassUtils.forName(adapterName, getClass().getClassLoader());
Method toFluxMethod = flowAdapterClass.getMethod("flowPublisherToFlux", publisherClass);
Method toFlowMethod = flowAdapterClass.getMethod("publisherToFlowPublisher", Publisher.class);
Object emptyFlow = ReflectionUtils.invokeMethod(toFlowMethod, null, Flux.empty());
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(publisherClass, () -> emptyFlow),
source -> (Publisher<?>) ReflectionUtils.invokeMethod(toFluxMethod, null, source),
publisher -> ReflectionUtils.invokeMethod(toFlowMethod, null, publisher)
);
}
catch (Throwable ex) {
// Ignore
}
}
}
private static class RxJava1Registrar {
void registerAdapters(ReactiveAdapterRegistry registry) {
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(rx.Observable.class, rx.Observable::empty),
source -> RxReactiveStreams.toPublisher((rx.Observable<?>) source),
RxReactiveStreams::toObservable
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleRequiredValue(rx.Single.class),
source -> RxReactiveStreams.toPublisher((rx.Single<?>) source),
RxReactiveStreams::toSingle
);
registry.registerReactiveType(
ReactiveTypeDescriptor.noValue(rx.Completable.class, rx.Completable::complete),
source -> RxReactiveStreams.toPublisher((rx.Completable) source),
RxReactiveStreams::toCompletable
);
}
}
private static class RxJava2Registrar {
void registerAdapters(ReactiveAdapterRegistry registry) {
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(io.reactivex.Flowable.class, io.reactivex.Flowable::empty),
source -> (io.reactivex.Flowable<?>) source,
io.reactivex.Flowable::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(io.reactivex.Observable.class, io.reactivex.Observable::empty),
source -> ((io.reactivex.Observable<?>) source).toFlowable(io.reactivex.BackpressureStrategy.BUFFER),
io.reactivex.Observable::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.Single.class),
source -> ((io.reactivex.Single<?>) source).toFlowable(),
io.reactivex.Single::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(io.reactivex.Maybe.class, io.reactivex.Maybe::empty),
source -> ((io.reactivex.Maybe<?>) source).toFlowable(),
source -> io.reactivex.Flowable.fromPublisher(source)
.toObservable().singleElement()
);
registry.registerReactiveType(
ReactiveTypeDescriptor.noValue(io.reactivex.Completable.class, io.reactivex.Completable::complete),
source -> ((io.reactivex.Completable) source).toFlowable(),
io.reactivex.Completable::fromPublisher
);
}
}
private static class RxJava3Registrar {
void registerAdapters(ReactiveAdapterRegistry registry) {
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(
io.reactivex.rxjava3.core.Flowable.class,
io.reactivex.rxjava3.core.Flowable::empty),
source -> (io.reactivex.rxjava3.core.Flowable<?>) source,
io.reactivex.rxjava3.core.Flowable::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(
io.reactivex.rxjava3.core.Observable.class,
io.reactivex.rxjava3.core.Observable::empty),
source -> ((io.reactivex.rxjava3.core.Observable<?>) source).toFlowable(
io.reactivex.rxjava3.core.BackpressureStrategy.BUFFER),
io.reactivex.rxjava3.core.Observable::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.rxjava3.core.Single.class),
source -> ((io.reactivex.rxjava3.core.Single<?>) source).toFlowable(),
io.reactivex.rxjava3.core.Single::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(
io.reactivex.rxjava3.core.Maybe.class,
io.reactivex.rxjava3.core.Maybe::empty),
source -> ((io.reactivex.rxjava3.core.Maybe<?>) source).toFlowable(),
io.reactivex.rxjava3.core.Maybe::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.noValue(
io.reactivex.rxjava3.core.Completable.class,
io.reactivex.rxjava3.core.Completable::complete),
source -> ((io.reactivex.rxjava3.core.Completable) source).toFlowable(),
io.reactivex.rxjava3.core.Completable::fromPublisher
);
}
}
/**
* ReactiveAdapter variant that wraps adapted Publishers as {@link Flux} or
* {@link Mono} depending on {@link ReactiveTypeDescriptor#isMultiValue()}.
@ -397,6 +239,34 @@ public class ReactiveAdapterRegistry {
}
private static class ReactorRegistrar {
void registerAdapters(ReactiveAdapterRegistry registry) {
// Register Flux and Mono before Publisher...
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty),
source -> (Mono<?>) source,
Mono::from);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty),
source -> (Flux<?>) source,
Flux::from);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty),
source -> (Publisher<?>) source,
source -> source);
registry.registerReactiveType(
ReactiveTypeDescriptor.nonDeferredAsyncValue(CompletionStage.class, EmptyCompletableFuture::new),
source -> Mono.fromCompletionStage((CompletionStage<?>) source),
source -> Mono.from(source).toFuture());
}
}
private static class EmptyCompletableFuture<T> extends CompletableFuture<T> {
EmptyCompletableFuture() {
@ -405,6 +275,127 @@ public class ReactiveAdapterRegistry {
}
private static class ReactorJdkFlowAdapterRegistrar {
void registerAdapter(ReactiveAdapterRegistry registry) {
// Reflectively access optional JDK 9+ API (for runtime compatibility with JDK 8)
try {
String publisherName = "java.util.concurrent.Flow.Publisher";
Class<?> publisherClass = ClassUtils.forName(publisherName, getClass().getClassLoader());
String adapterName = "reactor.adapter.JdkFlowAdapter";
Class<?> flowAdapterClass = ClassUtils.forName(adapterName, getClass().getClassLoader());
Method toFluxMethod = flowAdapterClass.getMethod("flowPublisherToFlux", publisherClass);
Method toFlowMethod = flowAdapterClass.getMethod("publisherToFlowPublisher", Publisher.class);
Object emptyFlow = ReflectionUtils.invokeMethod(toFlowMethod, null, Flux.empty());
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(publisherClass, () -> emptyFlow),
source -> (Publisher<?>) ReflectionUtils.invokeMethod(toFluxMethod, null, source),
publisher -> ReflectionUtils.invokeMethod(toFlowMethod, null, publisher));
}
catch (Throwable ex) {
// Ignore
}
}
}
private static class RxJava1Registrar {
void registerAdapters(ReactiveAdapterRegistry registry) {
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(rx.Observable.class, rx.Observable::empty),
source -> RxReactiveStreams.toPublisher((rx.Observable<?>) source),
RxReactiveStreams::toObservable);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleRequiredValue(rx.Single.class),
source -> RxReactiveStreams.toPublisher((rx.Single<?>) source),
RxReactiveStreams::toSingle);
registry.registerReactiveType(
ReactiveTypeDescriptor.noValue(rx.Completable.class, rx.Completable::complete),
source -> RxReactiveStreams.toPublisher((rx.Completable) source),
RxReactiveStreams::toCompletable);
}
}
private static class RxJava2Registrar {
void registerAdapters(ReactiveAdapterRegistry registry) {
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(io.reactivex.Flowable.class, io.reactivex.Flowable::empty),
source -> (io.reactivex.Flowable<?>) source,
io.reactivex.Flowable::fromPublisher);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(io.reactivex.Observable.class, io.reactivex.Observable::empty),
source -> ((io.reactivex.Observable<?>) source).toFlowable(io.reactivex.BackpressureStrategy.BUFFER),
io.reactivex.Observable::fromPublisher);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.Single.class),
source -> ((io.reactivex.Single<?>) source).toFlowable(),
io.reactivex.Single::fromPublisher);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(io.reactivex.Maybe.class, io.reactivex.Maybe::empty),
source -> ((io.reactivex.Maybe<?>) source).toFlowable(),
source -> io.reactivex.Flowable.fromPublisher(source)
.toObservable().singleElement());
registry.registerReactiveType(
ReactiveTypeDescriptor.noValue(io.reactivex.Completable.class, io.reactivex.Completable::complete),
source -> ((io.reactivex.Completable) source).toFlowable(),
io.reactivex.Completable::fromPublisher);
}
}
private static class RxJava3Registrar {
void registerAdapters(ReactiveAdapterRegistry registry) {
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(
io.reactivex.rxjava3.core.Flowable.class,
io.reactivex.rxjava3.core.Flowable::empty),
source -> (io.reactivex.rxjava3.core.Flowable<?>) source,
io.reactivex.rxjava3.core.Flowable::fromPublisher);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(
io.reactivex.rxjava3.core.Observable.class,
io.reactivex.rxjava3.core.Observable::empty),
source -> ((io.reactivex.rxjava3.core.Observable<?>) source).toFlowable(
io.reactivex.rxjava3.core.BackpressureStrategy.BUFFER),
io.reactivex.rxjava3.core.Observable::fromPublisher);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.rxjava3.core.Single.class),
source -> ((io.reactivex.rxjava3.core.Single<?>) source).toFlowable(),
io.reactivex.rxjava3.core.Single::fromPublisher);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(
io.reactivex.rxjava3.core.Maybe.class,
io.reactivex.rxjava3.core.Maybe::empty),
source -> ((io.reactivex.rxjava3.core.Maybe<?>) source).toFlowable(),
io.reactivex.rxjava3.core.Maybe::fromPublisher);
registry.registerReactiveType(
ReactiveTypeDescriptor.noValue(
io.reactivex.rxjava3.core.Completable.class,
io.reactivex.rxjava3.core.Completable::complete),
source -> ((io.reactivex.rxjava3.core.Completable) source).toFlowable(),
io.reactivex.rxjava3.core.Completable::fromPublisher);
}
}
private static class CoroutinesRegistrar {
@SuppressWarnings("KotlinInternalInJava")
@ -418,8 +409,7 @@ public class ReactiveAdapterRegistry {
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(kotlinx.coroutines.flow.Flow.class, kotlinx.coroutines.flow.FlowKt::emptyFlow),
source -> kotlinx.coroutines.reactor.ReactorFlowKt.asFlux((kotlinx.coroutines.flow.Flow<?>) source),
kotlinx.coroutines.reactive.ReactiveFlowKt::asFlow
);
kotlinx.coroutines.reactive.ReactiveFlowKt::asFlow);
}
}
@ -432,16 +422,14 @@ public class ReactiveAdapterRegistry {
io.smallrye.mutiny.Uni.class,
() -> io.smallrye.mutiny.Uni.createFrom().nothing()),
uni -> ((io.smallrye.mutiny.Uni<?>) uni).convert().toPublisher(),
publisher -> io.smallrye.mutiny.Uni.createFrom().publisher(publisher)
);
publisher -> io.smallrye.mutiny.Uni.createFrom().publisher(publisher));
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(
io.smallrye.mutiny.Multi.class,
() -> io.smallrye.mutiny.Multi.createFrom().empty()),
multi -> (io.smallrye.mutiny.Multi<?>) multi,
publisher -> io.smallrye.mutiny.Multi.createFrom().publisher(publisher)
);
publisher -> io.smallrye.mutiny.Multi.createFrom().publisher(publisher));
}
}
@ -459,7 +447,6 @@ public class ReactiveAdapterRegistry {
@Override
public void applyTo(BlockHound.Builder builder) {
// Avoid hard references potentially anywhere in spring-core (no need for structural dependency)
builder.allowBlockingCallsInside(

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -35,6 +35,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* Unit tests for {@link ReactiveAdapterRegistry}.
*
* @author Rossen Stoyanchev
*/
@SuppressWarnings("unchecked")
@ -42,16 +43,13 @@ class ReactiveAdapterRegistryTests {
private static final Duration ONE_SECOND = Duration.ofSeconds(1);
private final ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance();
@Test
void getAdapterForReactiveSubType() {
ReactiveAdapter adapter1 = getAdapter(Flux.class);
ReactiveAdapter adapter2 = getAdapter(ExtendedFlux.class);
assertThat(adapter2).isSameAs(adapter1);
this.registry.registerReactiveType(
@ -60,17 +58,32 @@ class ReactiveAdapterRegistryTests {
ExtendedFlux::from);
ReactiveAdapter adapter3 = getAdapter(ExtendedFlux.class);
assertThat(adapter3).isNotNull();
assertThat(adapter3).isNotSameAs(adapter1);
}
private ReactiveAdapter getAdapter(Class<?> reactiveType) {
ReactiveAdapter adapter = this.registry.getAdapter(reactiveType);
assertThat(adapter).isNotNull();
return adapter;
}
private static class ExtendedFlux<T> extends Flux<T> {
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
throw new UnsupportedOperationException();
}
}
@Nested
class Reactor {
@Test
void defaultAdapterRegistrations() {
// Reactor
assertThat(getAdapter(Mono.class)).isNotNull();
assertThat(getAdapter(Flux.class)).isNotNull();
@ -117,6 +130,7 @@ class ReactiveAdapterRegistryTests {
}
}
@Nested
class RxJava1 {
@ -178,12 +192,12 @@ class ReactiveAdapterRegistryTests {
}
}
@Nested
class RxJava2 {
@Test
void defaultAdapterRegistrations() {
// RxJava 2
assertThat(getAdapter(io.reactivex.Flowable.class)).isNotNull();
assertThat(getAdapter(io.reactivex.Observable.class)).isNotNull();
@ -261,12 +275,12 @@ class ReactiveAdapterRegistryTests {
}
}
@Nested
class RxJava3 {
@Test
void defaultAdapterRegistrations() {
// RxJava 3
assertThat(getAdapter(io.reactivex.rxjava3.core.Flowable.class)).isNotNull();
assertThat(getAdapter(io.reactivex.rxjava3.core.Observable.class)).isNotNull();
@ -344,12 +358,12 @@ class ReactiveAdapterRegistryTests {
}
}
@Nested
class Kotlin {
@Test
void defaultAdapterRegistrations() {
// Coroutines
assertThat(getAdapter(Deferred.class)).isNotNull();
}
@ -362,6 +376,7 @@ class ReactiveAdapterRegistryTests {
}
}
@Nested
class Mutiny {
@ -404,22 +419,6 @@ class ReactiveAdapterRegistryTests {
assertThat(target).isInstanceOf(Flux.class);
assertThat(((Flux<Integer>) target).blockLast(ONE_SECOND)).isEqualTo(Integer.valueOf(3));
}
}
private ReactiveAdapter getAdapter(Class<?> reactiveType) {
ReactiveAdapter adapter = this.registry.getAdapter(reactiveType);
assertThat(adapter).isNotNull();
return adapter;
}
private static class ExtendedFlux<T> extends Flux<T> {
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
throw new UnsupportedOperationException();
}
}
}

View File

@ -34,8 +34,6 @@ discussion of mock objects.
response objects to provide support for testing WebFlux applications without an HTTP
server. You can use the `WebTestClient` for end-to-end integration tests, too.
include::rsocket.adoc[leveloffset=+1]
@ -51,21 +49,21 @@ it helps to express cardinality -- for example, whether a single or multiple asy
values are expected, and that can be essential for making decisions (for example, when
encoding or decoding HTTP messages).
For annotated controllers, WebFlux transparently adapts to the reactive library chosen by
the application. This is done with the help of the
{api-spring-framework}/core/ReactiveAdapterRegistry.html[`ReactiveAdapterRegistry`], which
provides pluggable support for reactive library and other asynchronous types. The registry
has built-in support for RxJava 2/3, RxJava 1 (via RxJava Reactive Streams bridge), and
`CompletableFuture`, but you can register others, too.
For annotated controllers, WebFlux transparently adapts to the reactive library chosen
by the application. This is done with the help of the
{api-spring-framework}/core/ReactiveAdapterRegistry.html[`ReactiveAdapterRegistry`]
which provides pluggable support for reactive library and other asynchronous types.
The registry has built-in support for RxJava 3, Kotlin coroutines and SmallRye Mutiny,
but you can register other third-party adapters as well.
[NOTE]
====
As of Spring Framework 5.3, support for RxJava 1 is deprecated.
As of Spring Framework 5.3.11, support for RxJava 1 and 2 is deprecated, following
RxJava's own EOL advice and the upgrade recommendation towards RxJava 3.
====
For functional APIs (such as <<webflux-fn>>, the `WebClient`, and others), the general rules
for WebFlux APIs apply -- `Flux` and `Mono` as return values and a Reactive Streams
For functional APIs (such as <<webflux-fn>>, the `WebClient`, and others), the general
rules for WebFlux APIs apply -- `Flux` and `Mono` as return values and a Reactive Streams
`Publisher` as input. When a `Publisher`, whether custom or from another reactive library,
is provided, it can be treated only as a stream with unknown semantics (0..N). If, however,
the semantics are known, you can wrap it with `Flux` or `Mono.from(Publisher)` instead