Remove support for RxJava 2.x as well

Includes realignment of Reactor adapter registration.

Closes gh-27443
This commit is contained in:
Juergen Hoeller 2021-09-27 13:01:32 +02:00
parent 30efa4d478
commit e6112344d2
5 changed files with 99 additions and 248 deletions

View File

@ -64,7 +64,6 @@ configure(allprojects) { project ->
entry 'groovy-xml'
}
dependency "io.reactivex.rxjava2:rxjava:2.2.21"
dependency "io.reactivex.rxjava3:rxjava:3.1.1"
dependency "io.smallrye.reactive:mutiny:1.0.0"
dependency "io.projectreactor.tools:blockhound:1.0.4.RELEASE"

View File

@ -44,7 +44,6 @@ dependencies {
optional("org.jetbrains.kotlinx:kotlinx-coroutines-core")
optional("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
optional("io.projectreactor:reactor-core")
optional("io.reactivex.rxjava2:rxjava")
optional("io.reactivex.rxjava3:rxjava")
optional("io.smallrye.reactive:mutiny")
optional("io.netty:netty-buffer")

View File

@ -58,8 +58,6 @@ public class ReactiveAdapterRegistry {
private static final boolean reactorPresent;
private static final boolean rxjava2Present;
private static final boolean rxjava3Present;
private static final boolean kotlinCoroutinesPresent;
@ -69,7 +67,6 @@ public class ReactiveAdapterRegistry {
static {
ClassLoader classLoader = ReactiveAdapterRegistry.class.getClassLoader();
reactorPresent = ClassUtils.isPresent("reactor.core.publisher.Flux", classLoader);
rxjava2Present = ClassUtils.isPresent("io.reactivex.Flowable", classLoader);
rxjava3Present = ClassUtils.isPresent("io.reactivex.rxjava3.core.Flowable", classLoader);
kotlinCoroutinesPresent = ClassUtils.isPresent("kotlinx.coroutines.reactor.MonoKt", classLoader);
mutinyPresent = ClassUtils.isPresent("io.smallrye.mutiny.Multi", classLoader);
@ -86,13 +83,9 @@ public class ReactiveAdapterRegistry {
// Reactor
if (reactorPresent) {
new ReactorRegistrar().registerAdapters(this);
new ReactorJdkFlowAdapterRegistrar().registerAdapter(this);
}
// RxJava
if (rxjava2Present) {
new RxJava2Registrar().registerAdapters(this);
}
if (rxjava3Present) {
new RxJava3Registrar().registerAdapters(this);
}
@ -200,123 +193,6 @@ public class ReactiveAdapterRegistry {
}
private static class ReactorRegistrar {
void registerAdapters(ReactiveAdapterRegistry registry) {
// Register Flux and Mono before Publisher...
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty),
source -> (Mono<?>) source,
Mono::from
);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty),
source -> (Flux<?>) source,
Flux::from);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty),
source -> (Publisher<?>) source,
source -> source);
registry.registerReactiveType(
ReactiveTypeDescriptor.nonDeferredAsyncValue(CompletionStage.class, EmptyCompletableFuture::new),
source -> Mono.fromCompletionStage((CompletionStage<?>) source),
source -> Mono.from(source).toFuture()
);
}
}
private static class ReactorJdkFlowAdapterRegistrar {
void registerAdapter(ReactiveAdapterRegistry registry) {
Flow.Publisher<?> emptyFlow = JdkFlowAdapter.publisherToFlowPublisher(Flux.empty());
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Flow.Publisher.class, () -> emptyFlow),
source -> JdkFlowAdapter.flowPublisherToFlux((Flow.Publisher<?>) source),
JdkFlowAdapter::publisherToFlowPublisher
);
}
}
private static class RxJava2Registrar {
void registerAdapters(ReactiveAdapterRegistry registry) {
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(io.reactivex.Flowable.class, io.reactivex.Flowable::empty),
source -> (io.reactivex.Flowable<?>) source,
io.reactivex.Flowable::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(io.reactivex.Observable.class, io.reactivex.Observable::empty),
source -> ((io.reactivex.Observable<?>) source).toFlowable(io.reactivex.BackpressureStrategy.BUFFER),
io.reactivex.Observable::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.Single.class),
source -> ((io.reactivex.Single<?>) source).toFlowable(),
io.reactivex.Single::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(io.reactivex.Maybe.class, io.reactivex.Maybe::empty),
source -> ((io.reactivex.Maybe<?>) source).toFlowable(),
source -> io.reactivex.Flowable.fromPublisher(source)
.toObservable().singleElement()
);
registry.registerReactiveType(
ReactiveTypeDescriptor.noValue(io.reactivex.Completable.class, io.reactivex.Completable::complete),
source -> ((io.reactivex.Completable) source).toFlowable(),
io.reactivex.Completable::fromPublisher
);
}
}
private static class RxJava3Registrar {
void registerAdapters(ReactiveAdapterRegistry registry) {
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(
io.reactivex.rxjava3.core.Flowable.class,
io.reactivex.rxjava3.core.Flowable::empty),
source -> (io.reactivex.rxjava3.core.Flowable<?>) source,
io.reactivex.rxjava3.core.Flowable::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(
io.reactivex.rxjava3.core.Observable.class,
io.reactivex.rxjava3.core.Observable::empty),
source -> ((io.reactivex.rxjava3.core.Observable<?>) source).toFlowable(
io.reactivex.rxjava3.core.BackpressureStrategy.BUFFER),
io.reactivex.rxjava3.core.Observable::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.rxjava3.core.Single.class),
source -> ((io.reactivex.rxjava3.core.Single<?>) source).toFlowable(),
io.reactivex.rxjava3.core.Single::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(
io.reactivex.rxjava3.core.Maybe.class,
io.reactivex.rxjava3.core.Maybe::empty),
source -> ((io.reactivex.rxjava3.core.Maybe<?>) source).toFlowable(),
io.reactivex.rxjava3.core.Maybe::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.noValue(
io.reactivex.rxjava3.core.Completable.class,
io.reactivex.rxjava3.core.Completable::complete),
source -> ((io.reactivex.rxjava3.core.Completable) source).toFlowable(),
io.reactivex.rxjava3.core.Completable::fromPublisher
);
}
}
/**
* ReactiveAdapter variant that wraps adapted Publishers as {@link Flux} or
* {@link Mono} depending on {@link ReactiveTypeDescriptor#isMultiValue()}.
@ -340,6 +216,41 @@ public class ReactiveAdapterRegistry {
}
private static class ReactorRegistrar {
private static final Flow.Publisher<?> EMPTY_FLOW = JdkFlowAdapter.publisherToFlowPublisher(Flux.empty());
void registerAdapters(ReactiveAdapterRegistry registry) {
// Register Flux and Mono before Publisher...
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty),
source -> (Mono<?>) source,
Mono::from);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty),
source -> (Flux<?>) source,
Flux::from);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty),
source -> (Publisher<?>) source,
source -> source);
registry.registerReactiveType(
ReactiveTypeDescriptor.nonDeferredAsyncValue(CompletionStage.class, EmptyCompletableFuture::new),
source -> Mono.fromCompletionStage((CompletionStage<?>) source),
source -> Mono.from(source).toFuture());
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Flow.Publisher.class, () -> EMPTY_FLOW),
source -> JdkFlowAdapter.flowPublisherToFlux((Flow.Publisher<?>) source),
JdkFlowAdapter::publisherToFlowPublisher);
}
}
private static class EmptyCompletableFuture<T> extends CompletableFuture<T> {
EmptyCompletableFuture() {
@ -348,6 +259,46 @@ public class ReactiveAdapterRegistry {
}
private static class RxJava3Registrar {
void registerAdapters(ReactiveAdapterRegistry registry) {
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(
io.reactivex.rxjava3.core.Flowable.class,
io.reactivex.rxjava3.core.Flowable::empty),
source -> (io.reactivex.rxjava3.core.Flowable<?>) source,
io.reactivex.rxjava3.core.Flowable::fromPublisher);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(
io.reactivex.rxjava3.core.Observable.class,
io.reactivex.rxjava3.core.Observable::empty),
source -> ((io.reactivex.rxjava3.core.Observable<?>) source).toFlowable(
io.reactivex.rxjava3.core.BackpressureStrategy.BUFFER),
io.reactivex.rxjava3.core.Observable::fromPublisher);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.rxjava3.core.Single.class),
source -> ((io.reactivex.rxjava3.core.Single<?>) source).toFlowable(),
io.reactivex.rxjava3.core.Single::fromPublisher);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(
io.reactivex.rxjava3.core.Maybe.class,
io.reactivex.rxjava3.core.Maybe::empty),
source -> ((io.reactivex.rxjava3.core.Maybe<?>) source).toFlowable(),
io.reactivex.rxjava3.core.Maybe::fromPublisher);
registry.registerReactiveType(
ReactiveTypeDescriptor.noValue(
io.reactivex.rxjava3.core.Completable.class,
io.reactivex.rxjava3.core.Completable::complete),
source -> ((io.reactivex.rxjava3.core.Completable) source).toFlowable(),
io.reactivex.rxjava3.core.Completable::fromPublisher);
}
}
private static class CoroutinesRegistrar {
@SuppressWarnings("KotlinInternalInJava")
@ -361,8 +312,7 @@ public class ReactiveAdapterRegistry {
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(kotlinx.coroutines.flow.Flow.class, kotlinx.coroutines.flow.FlowKt::emptyFlow),
source -> kotlinx.coroutines.reactor.ReactorFlowKt.asFlux((kotlinx.coroutines.flow.Flow<?>) source),
kotlinx.coroutines.reactive.ReactiveFlowKt::asFlow
);
kotlinx.coroutines.reactive.ReactiveFlowKt::asFlow);
}
}
@ -375,16 +325,14 @@ public class ReactiveAdapterRegistry {
io.smallrye.mutiny.Uni.class,
() -> io.smallrye.mutiny.Uni.createFrom().nothing()),
uni -> ((io.smallrye.mutiny.Uni<?>) uni).convert().toPublisher(),
publisher -> io.smallrye.mutiny.Uni.createFrom().publisher(publisher)
);
publisher -> io.smallrye.mutiny.Uni.createFrom().publisher(publisher));
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(
io.smallrye.mutiny.Multi.class,
() -> io.smallrye.mutiny.Multi.createFrom().empty()),
multi -> (io.smallrye.mutiny.Multi<?>) multi,
publisher -> io.smallrye.mutiny.Multi.createFrom().publisher(publisher)
);
publisher -> io.smallrye.mutiny.Multi.createFrom().publisher(publisher));
}
}
@ -402,7 +350,6 @@ public class ReactiveAdapterRegistry {
@Override
public void applyTo(BlockHound.Builder builder) {
// Avoid hard references potentially anywhere in spring-core (no need for structural dependency)
builder.allowBlockingCallsInside(

View File

@ -43,16 +43,13 @@ class ReactiveAdapterRegistryTests {
private static final Duration ONE_SECOND = Duration.ofSeconds(1);
private final ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance();
@Test
void getAdapterForReactiveSubType() {
ReactiveAdapter adapter1 = getAdapter(Flux.class);
ReactiveAdapter adapter2 = getAdapter(ExtendedFlux.class);
assertThat(adapter2).isSameAs(adapter1);
this.registry.registerReactiveType(
@ -61,17 +58,32 @@ class ReactiveAdapterRegistryTests {
ExtendedFlux::from);
ReactiveAdapter adapter3 = getAdapter(ExtendedFlux.class);
assertThat(adapter3).isNotNull();
assertThat(adapter3).isNotSameAs(adapter1);
}
private ReactiveAdapter getAdapter(Class<?> reactiveType) {
ReactiveAdapter adapter = this.registry.getAdapter(reactiveType);
assertThat(adapter).isNotNull();
return adapter;
}
private static class ExtendedFlux<T> extends Flux<T> {
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
throw new UnsupportedOperationException();
}
}
@Nested
class Reactor {
@Test
void defaultAdapterRegistrations() {
// Reactor
assertThat(getAdapter(Mono.class)).isNotNull();
assertThat(getAdapter(Flux.class)).isNotNull();
@ -118,95 +130,12 @@ class ReactiveAdapterRegistryTests {
}
}
@Nested
class RxJava2 {
@Test
void defaultAdapterRegistrations() {
// RxJava 2
assertThat(getAdapter(io.reactivex.Flowable.class)).isNotNull();
assertThat(getAdapter(io.reactivex.Observable.class)).isNotNull();
assertThat(getAdapter(io.reactivex.Single.class)).isNotNull();
assertThat(getAdapter(io.reactivex.Maybe.class)).isNotNull();
assertThat(getAdapter(io.reactivex.Completable.class)).isNotNull();
}
@Test
void toFlowable() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Publisher<Integer> source = Flux.fromIterable(sequence);
Object target = getAdapter(io.reactivex.Flowable.class).fromPublisher(source);
assertThat(target instanceof io.reactivex.Flowable).isTrue();
assertThat(((io.reactivex.Flowable<?>) target).toList().blockingGet()).isEqualTo(sequence);
}
@Test
void toObservable() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Publisher<Integer> source = Flux.fromIterable(sequence);
Object target = getAdapter(io.reactivex.Observable.class).fromPublisher(source);
assertThat(target instanceof io.reactivex.Observable).isTrue();
assertThat(((io.reactivex.Observable<?>) target).toList().blockingGet()).isEqualTo(sequence);
}
@Test
void toSingle() {
Publisher<Integer> source = Flux.fromArray(new Integer[] {1});
Object target = getAdapter(io.reactivex.Single.class).fromPublisher(source);
assertThat(target instanceof io.reactivex.Single).isTrue();
assertThat(((io.reactivex.Single<Integer>) target).blockingGet()).isEqualTo(Integer.valueOf(1));
}
@Test
void toCompletable() {
Publisher<Integer> source = Flux.fromArray(new Integer[] {1, 2, 3});
Object target = getAdapter(io.reactivex.Completable.class).fromPublisher(source);
assertThat(target instanceof io.reactivex.Completable).isTrue();
((io.reactivex.Completable) target).blockingAwait();
}
@Test
void fromFlowable() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Object source = io.reactivex.Flowable.fromIterable(sequence);
Object target = getAdapter(io.reactivex.Flowable.class).toPublisher(source);
assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Flux<Integer>) target).collectList().block(ONE_SECOND)).isEqualTo(sequence);
}
@Test
void fromObservable() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Object source = io.reactivex.Observable.fromIterable(sequence);
Object target = getAdapter(io.reactivex.Observable.class).toPublisher(source);
assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Flux<Integer>) target).collectList().block(ONE_SECOND)).isEqualTo(sequence);
}
@Test
void fromSingle() {
Object source = io.reactivex.Single.just(1);
Object target = getAdapter(io.reactivex.Single.class).toPublisher(source);
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Mono<Integer>) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1));
}
@Test
void fromCompletable() {
Object source = io.reactivex.Completable.complete();
Object target = getAdapter(io.reactivex.Completable.class).toPublisher(source);
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
((Mono<Void>) target).block(ONE_SECOND);
}
}
@Nested
class RxJava3 {
@Test
void defaultAdapterRegistrations() {
// RxJava 3
assertThat(getAdapter(io.reactivex.rxjava3.core.Flowable.class)).isNotNull();
assertThat(getAdapter(io.reactivex.rxjava3.core.Observable.class)).isNotNull();
@ -284,12 +213,12 @@ class ReactiveAdapterRegistryTests {
}
}
@Nested
class Kotlin {
@Test
void defaultAdapterRegistrations() {
// Coroutines
assertThat(getAdapter(Deferred.class)).isNotNull();
}
@ -302,6 +231,7 @@ class ReactiveAdapterRegistryTests {
}
}
@Nested
class Mutiny {
@ -344,22 +274,6 @@ class ReactiveAdapterRegistryTests {
assertThat(target).isInstanceOf(Flux.class);
assertThat(((Flux<Integer>) target).blockLast(ONE_SECOND)).isEqualTo(Integer.valueOf(3));
}
}
private ReactiveAdapter getAdapter(Class<?> reactiveType) {
ReactiveAdapter adapter = this.registry.getAdapter(reactiveType);
assertThat(adapter).isNotNull();
return adapter;
}
private static class ExtendedFlux<T> extends Flux<T> {
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
throw new UnsupportedOperationException();
}
}
}

View File

@ -34,8 +34,6 @@ discussion of mock objects.
response objects to provide support for testing WebFlux applications without an HTTP
server. You can use the `WebTestClient` for end-to-end integration tests, too.
include::rsocket.adoc[leveloffset=+1]
@ -55,14 +53,8 @@ For annotated controllers, WebFlux transparently adapts to the reactive library
the application. This is done with the help of the
{api-spring-framework}/core/ReactiveAdapterRegistry.html[`ReactiveAdapterRegistry`], which
provides pluggable support for reactive library and other asynchronous types. The registry
has built-in support for RxJava 2/3, RxJava 1 (via RxJava Reactive Streams bridge), and
`CompletableFuture`, but you can register others, too.
[NOTE]
====
As of Spring Framework 5.3, support for RxJava 1 is deprecated.
====
has built-in support for RxJava 3, Kotlin coroutines and SmallRye Mutiny, but you can
register others, too.
For functional APIs (such as <<webflux-fn>>, the `WebClient`, and others), the general rules
for WebFlux APIs apply -- `Flux` and `Mono` as return values and a Reactive Streams