Add support for RxJava 3

Closes gh-24170
This commit is contained in:
Rossen Stoyanchev 2020-05-11 08:40:01 +01:00
parent 3f30dcae80
commit 7f59381c7d
29 changed files with 450 additions and 500 deletions

View File

@ -58,6 +58,7 @@ configure(allprojects) { project ->
dependency "io.reactivex:rxjava:1.3.8"
dependency "io.reactivex:rxjava-reactive-streams:1.2.1"
dependency "io.reactivex.rxjava2:rxjava:2.2.19"
dependency "io.reactivex.rxjava3:rxjava:3.0.3"
dependency "io.projectreactor.tools:blockhound:1.0.2.RELEASE"
dependency "com.caucho:hessian:4.0.62"

View File

@ -52,6 +52,7 @@ dependencies {
optional("io.reactivex:rxjava")
optional("io.reactivex:rxjava-reactive-streams")
optional("io.reactivex.rxjava2:rxjava")
optional("io.reactivex.rxjava3:rxjava")
optional("io.netty:netty-buffer")
testCompile("io.projectreactor:reactor-test")
testCompile("javax.annotation:javax.annotation-api")

View File

@ -24,8 +24,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import kotlinx.coroutines.CompletableDeferredKt;
import kotlinx.coroutines.Deferred;
import org.reactivestreams.Publisher;
@ -46,8 +44,9 @@ import org.springframework.util.ReflectionUtils;
* {@code Observable}, and others.
*
* <p>By default, depending on classpath availability, adapters are registered
* for Reactor, RxJava 1, RxJava 2 types, {@link CompletableFuture}, Java 9+
* {@code Flow.Publisher} and Kotlin Coroutines {@code Deferred} and {@code Flow}.
* for Reactor, RxJava 2/3, or RxJava 1 (+ RxJava Reactive Streams bridge),
* {@link CompletableFuture}, Java 9+ {@code Flow.Publisher}, and Kotlin
* Coroutines' {@code Deferred} and {@code Flow}.
*
* @author Rossen Stoyanchev
* @author Sebastien Deleuze
@ -89,6 +88,11 @@ public class ReactiveAdapterRegistry {
new RxJava2Registrar().registerAdapters(this);
}
// RxJava3
if (ClassUtils.isPresent("io.reactivex.rxjava3.core.Flowable", classLoader)) {
new RxJava3Registrar().registerAdapters(this);
}
// Java 9+ Flow.Publisher
if (ClassUtils.isPresent("java.util.concurrent.Flow.Publisher", classLoader)) {
new ReactorJdkFlowAdapterRegistrar().registerAdapter(this);
@ -104,9 +108,7 @@ public class ReactiveAdapterRegistry {
/**
* Whether the registry has any adapters which would be the case if any of
* Reactor, RxJava 2, or RxJava 1 (+ RxJava Reactive Streams bridge) are
* present on the classpath.
* Whether the registry has any adapters.
*/
public boolean hasAdapters() {
return !this.adapters.isEmpty();
@ -254,31 +256,78 @@ public class ReactiveAdapterRegistry {
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(io.reactivex.Flowable.class, io.reactivex.Flowable::empty),
source -> (io.reactivex.Flowable<?>) source,
Flowable::fromPublisher
io.reactivex.Flowable::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(io.reactivex.Observable.class, io.reactivex.Observable::empty),
source -> ((io.reactivex.Observable<?>) source).toFlowable(BackpressureStrategy.BUFFER),
source -> io.reactivex.Flowable.fromPublisher(source).toObservable()
source -> ((io.reactivex.Observable<?>) source).toFlowable(io.reactivex.BackpressureStrategy.BUFFER),
source -> io.reactivex.Flowable.fromPublisher(source)
.toObservable()
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.Single.class),
source -> ((io.reactivex.Single<?>) source).toFlowable(),
source -> io.reactivex.Flowable.fromPublisher(source).toObservable().singleElement().toSingle()
source -> io.reactivex.Flowable.fromPublisher(source)
.toObservable().singleElement().toSingle()
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(io.reactivex.Maybe.class, io.reactivex.Maybe::empty),
source -> ((io.reactivex.Maybe<?>) source).toFlowable(),
source -> io.reactivex.Flowable.fromPublisher(source).toObservable().singleElement()
source -> io.reactivex.Flowable.fromPublisher(source)
.toObservable().singleElement()
);
registry.registerReactiveType(
ReactiveTypeDescriptor.noValue(io.reactivex.Completable.class, io.reactivex.Completable::complete),
source -> ((io.reactivex.Completable) source).toFlowable(),
source -> io.reactivex.Flowable.fromPublisher(source).toObservable().ignoreElements()
source -> io.reactivex.Flowable.fromPublisher(source)
.toObservable().ignoreElements()
);
}
}
private static class RxJava3Registrar {
void registerAdapters(ReactiveAdapterRegistry registry) {
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(
io.reactivex.rxjava3.core.Flowable.class,
io.reactivex.rxjava3.core.Flowable::empty),
source -> (io.reactivex.rxjava3.core.Flowable<?>) source,
io.reactivex.rxjava3.core.Flowable::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(
io.reactivex.rxjava3.core.Observable.class,
io.reactivex.rxjava3.core.Observable::empty),
source -> ((io.reactivex.rxjava3.core.Observable<?>) source).toFlowable(
io.reactivex.rxjava3.core.BackpressureStrategy.BUFFER),
source -> io.reactivex.rxjava3.core.Flowable.fromPublisher(source)
.toObservable()
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.rxjava3.core.Single.class),
source -> ((io.reactivex.rxjava3.core.Single<?>) source).toFlowable(),
source -> io.reactivex.rxjava3.core.Flowable.fromPublisher(source)
.toObservable().singleElement().toSingle()
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(
io.reactivex.rxjava3.core.Maybe.class,
io.reactivex.rxjava3.core.Maybe::empty),
source -> ((io.reactivex.rxjava3.core.Maybe<?>) source).toFlowable(),
source -> io.reactivex.rxjava3.core.Flowable.fromPublisher(source)
.toObservable().singleElement()
);
registry.registerReactiveType(
ReactiveTypeDescriptor.noValue(
io.reactivex.rxjava3.core.Completable.class,
io.reactivex.rxjava3.core.Completable::complete),
source -> ((io.reactivex.rxjava3.core.Completable) source).toFlowable(),
source -> io.reactivex.rxjava3.core.Flowable.fromPublisher(source)
.toObservable().ignoreElements()
);
}
}
private static class ReactorJdkFlowAdapterRegistrar {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -23,8 +23,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

View File

@ -21,17 +21,13 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import kotlinx.coroutines.Deferred;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
import rx.Completable;
import rx.Observable;
import rx.Single;
import static org.assertj.core.api.Assertions.assertThat;
@ -45,35 +41,6 @@ class ReactiveAdapterRegistryTests {
private final ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance();
@Test
void defaultAdapterRegistrations() {
// Reactor
assertThat(getAdapter(Mono.class)).isNotNull();
assertThat(getAdapter(Flux.class)).isNotNull();
// Publisher
assertThat(getAdapter(Publisher.class)).isNotNull();
// Completable
assertThat(getAdapter(CompletableFuture.class)).isNotNull();
// RxJava 1
assertThat(getAdapter(Observable.class)).isNotNull();
assertThat(getAdapter(Single.class)).isNotNull();
assertThat(getAdapter(Completable.class)).isNotNull();
// RxJava 2
assertThat(getAdapter(Flowable.class)).isNotNull();
assertThat(getAdapter(io.reactivex.Observable.class)).isNotNull();
assertThat(getAdapter(io.reactivex.Single.class)).isNotNull();
assertThat(getAdapter(Maybe.class)).isNotNull();
assertThat(getAdapter(io.reactivex.Completable.class)).isNotNull();
// Coroutines
assertThat(getAdapter(Deferred.class)).isNotNull();
}
@Test
void getAdapterForReactiveSubType() {
@ -93,193 +60,302 @@ class ReactiveAdapterRegistryTests {
assertThat(adapter3).isNotSameAs(adapter1);
}
@Nested
class Reactor {
@Test
void publisherToFlux() {
void defaultAdapterRegistrations() {
// Reactor
assertThat(getAdapter(Mono.class)).isNotNull();
assertThat(getAdapter(Flux.class)).isNotNull();
// Publisher
assertThat(getAdapter(Publisher.class)).isNotNull();
// Completable
assertThat(getAdapter(CompletableFuture.class)).isNotNull();
}
@Test
void toFlux() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Publisher<Integer> source = Flowable.fromIterable(sequence);
Publisher<Integer> source = io.reactivex.rxjava3.core.Flowable.fromIterable(sequence);
Object target = getAdapter(Flux.class).fromPublisher(source);
boolean condition = target instanceof Flux;
assertThat(condition).isTrue();
assertThat(target instanceof Flux).isTrue();
assertThat(((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
}
// TODO: publisherToMono/CompletableFuture vs Single (ISE on multiple elements)?
@Test
void publisherToMono() {
Publisher<Integer> source = Flowable.fromArray(1, 2, 3);
void toMono() {
Publisher<Integer> source = io.reactivex.rxjava3.core.Flowable.fromArray(1, 2, 3);
Object target = getAdapter(Mono.class).fromPublisher(source);
boolean condition = target instanceof Mono;
assertThat(condition).isTrue();
assertThat(target instanceof Mono).isTrue();
assertThat(((Mono<Integer>) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
}
@Test
void publisherToCompletableFuture() throws Exception {
Publisher<Integer> source = Flowable.fromArray(1, 2, 3);
void toCompletableFuture() throws Exception {
Publisher<Integer> source = Flux.fromArray(new Integer[] {1, 2, 3});
Object target = getAdapter(CompletableFuture.class).fromPublisher(source);
boolean condition = target instanceof CompletableFuture;
assertThat(condition).isTrue();
assertThat(target instanceof CompletableFuture).isTrue();
assertThat(((CompletableFuture<Integer>) target).get()).isEqualTo(Integer.valueOf(1));
}
@Test
void publisherToRxObservable() {
void fromCompletableFuture() {
CompletableFuture<Integer> future = new CompletableFuture<>();
future.complete(1);
Object target = getAdapter(CompletableFuture.class).toPublisher(future);
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Mono<Integer>) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
}
}
@Nested
class RxJava1 {
@Test
void defaultAdapterRegistrations() {
assertThat(getAdapter(rx.Observable.class)).isNotNull();
assertThat(getAdapter(rx.Single.class)).isNotNull();
assertThat(getAdapter(rx.Completable.class)).isNotNull();
}
@Test
void toObservable() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Publisher<Integer> source = Flowable.fromIterable(sequence);
Publisher<Integer> source = Flux.fromIterable(sequence);
Object target = getAdapter(rx.Observable.class).fromPublisher(source);
boolean condition = target instanceof Observable;
assertThat(condition).isTrue();
assertThat(((Observable<?>) target).toList().toBlocking().first()).isEqualTo(sequence);
assertThat(target instanceof rx.Observable).isTrue();
assertThat(((rx.Observable<?>) target).toList().toBlocking().first()).isEqualTo(sequence);
}
@Test
void publisherToRxSingle() {
Publisher<Integer> source = Flowable.fromArray(1);
void toSingle() {
Publisher<Integer> source = Flux.fromArray(new Integer[] {1});
Object target = getAdapter(rx.Single.class).fromPublisher(source);
boolean condition = target instanceof Single;
assertThat(condition).isTrue();
assertThat(((Single<Integer>) target).toBlocking().value()).isEqualTo(Integer.valueOf(1));
assertThat(target instanceof rx.Single).isTrue();
assertThat(((rx.Single<Integer>) target).toBlocking().value()).isEqualTo(Integer.valueOf(1));
}
@Test
void publisherToRxCompletable() {
Publisher<Integer> source = Flowable.fromArray(1, 2, 3);
void toCompletable() {
Publisher<Integer> source = Flux.fromArray(new Integer[] {1, 2, 3});
Object target = getAdapter(rx.Completable.class).fromPublisher(source);
boolean condition = target instanceof Completable;
assertThat(condition).isTrue();
assertThat(((Completable) target).get()).isNull();
assertThat(target instanceof rx.Completable).isTrue();
assertThat(((rx.Completable) target).get()).isNull();
}
@Test
void publisherToReactivexFlowable() {
void fromObservable() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Object source = rx.Observable.from(sequence);
Object target = getAdapter(rx.Observable.class).toPublisher(source);
assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
}
@Test
void fromSingle() {
Object source = rx.Single.just(1);
Object target = getAdapter(rx.Single.class).toPublisher(source);
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Mono<Integer>) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
}
@Test
void fromCompletable() {
Object source = rx.Completable.complete();
Object target = getAdapter(rx.Completable.class).toPublisher(source);
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
((Mono<Void>) target).block(Duration.ofMillis(1000));
}
}
@Nested
class RxJava2 {
@Test
void defaultAdapterRegistrations() {
// RxJava 2
assertThat(getAdapter(io.reactivex.Flowable.class)).isNotNull();
assertThat(getAdapter(io.reactivex.Observable.class)).isNotNull();
assertThat(getAdapter(io.reactivex.Single.class)).isNotNull();
assertThat(getAdapter(io.reactivex.Maybe.class)).isNotNull();
assertThat(getAdapter(io.reactivex.Completable.class)).isNotNull();
}
@Test
void toFlowable() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Publisher<Integer> source = Flux.fromIterable(sequence);
Object target = getAdapter(io.reactivex.Flowable.class).fromPublisher(source);
boolean condition = target instanceof Flowable;
assertThat(condition).isTrue();
assertThat(((Flowable<?>) target).toList().blockingGet()).isEqualTo(sequence);
assertThat(target instanceof io.reactivex.Flowable).isTrue();
assertThat(((io.reactivex.Flowable<?>) target).toList().blockingGet()).isEqualTo(sequence);
}
@Test
void publisherToReactivexObservable() {
void toObservable() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Publisher<Integer> source = Flowable.fromIterable(sequence);
Publisher<Integer> source = Flux.fromIterable(sequence);
Object target = getAdapter(io.reactivex.Observable.class).fromPublisher(source);
boolean condition = target instanceof io.reactivex.Observable;
assertThat(condition).isTrue();
assertThat(target instanceof io.reactivex.Observable).isTrue();
assertThat(((io.reactivex.Observable<?>) target).toList().blockingGet()).isEqualTo(sequence);
}
@Test
void publisherToReactivexSingle() {
Publisher<Integer> source = Flowable.fromArray(1);
void toSingle() {
Publisher<Integer> source = Flux.fromArray(new Integer[] {1});
Object target = getAdapter(io.reactivex.Single.class).fromPublisher(source);
boolean condition = target instanceof io.reactivex.Single;
assertThat(condition).isTrue();
assertThat(target instanceof io.reactivex.Single).isTrue();
assertThat(((io.reactivex.Single<Integer>) target).blockingGet()).isEqualTo(Integer.valueOf(1));
}
@Test
void publisherToReactivexCompletable() {
Publisher<Integer> source = Flowable.fromArray(1, 2, 3);
void toCompletable() {
Publisher<Integer> source = Flux.fromArray(new Integer[] {1, 2, 3});
Object target = getAdapter(io.reactivex.Completable.class).fromPublisher(source);
boolean condition = target instanceof io.reactivex.Completable;
assertThat(condition).isTrue();
assertThat(target instanceof io.reactivex.Completable).isTrue();
((io.reactivex.Completable) target).blockingAwait();
}
@Test
void rxObservableToPublisher() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Object source = rx.Observable.from(sequence);
Object target = getAdapter(rx.Observable.class).toPublisher(source);
boolean condition = target instanceof Flux;
assertThat(condition).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
}
@Test
void rxSingleToPublisher() {
Object source = rx.Single.just(1);
Object target = getAdapter(rx.Single.class).toPublisher(source);
boolean condition = target instanceof Mono;
assertThat(condition).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Mono<Integer>) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
}
@Test
void rxCompletableToPublisher() {
Object source = rx.Completable.complete();
Object target = getAdapter(rx.Completable.class).toPublisher(source);
boolean condition = target instanceof Mono;
assertThat(condition).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
((Mono<Void>) target).block(Duration.ofMillis(1000));
}
@Test
void reactivexFlowableToPublisher() {
void fromFlowable() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Object source = io.reactivex.Flowable.fromIterable(sequence);
Object target = getAdapter(io.reactivex.Flowable.class).toPublisher(source);
boolean condition = target instanceof Flux;
assertThat(condition).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
}
@Test
void reactivexObservableToPublisher() {
void fromObservable() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Object source = io.reactivex.Observable.fromIterable(sequence);
Object target = getAdapter(io.reactivex.Observable.class).toPublisher(source);
boolean condition = target instanceof Flux;
assertThat(condition).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
}
@Test
void reactivexSingleToPublisher() {
void fromSingle() {
Object source = io.reactivex.Single.just(1);
Object target = getAdapter(io.reactivex.Single.class).toPublisher(source);
boolean condition = target instanceof Mono;
assertThat(condition).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Mono<Integer>) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
}
@Test
void reactivexCompletableToPublisher() {
void fromCompletable() {
Object source = io.reactivex.Completable.complete();
Object target = getAdapter(io.reactivex.Completable.class).toPublisher(source);
boolean condition = target instanceof Mono;
assertThat(condition).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
((Mono<Void>) target).block(Duration.ofMillis(1000));
}
}
@Nested
class RxJava3 {
@Test
void defaultAdapterRegistrations() {
// RxJava 3
assertThat(getAdapter(io.reactivex.rxjava3.core.Flowable.class)).isNotNull();
assertThat(getAdapter(io.reactivex.rxjava3.core.Observable.class)).isNotNull();
assertThat(getAdapter(io.reactivex.rxjava3.core.Single.class)).isNotNull();
assertThat(getAdapter(io.reactivex.rxjava3.core.Maybe.class)).isNotNull();
assertThat(getAdapter(io.reactivex.rxjava3.core.Completable.class)).isNotNull();
}
@Test
void completableFutureToPublisher() {
CompletableFuture<Integer> future = new CompletableFuture<>();
future.complete(1);
Object target = getAdapter(CompletableFuture.class).toPublisher(future);
boolean condition = target instanceof Mono;
assertThat(condition).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
void toFlowable() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Publisher<Integer> source = Flux.fromIterable(sequence);
Object target = getAdapter(io.reactivex.rxjava3.core.Flowable.class).fromPublisher(source);
assertThat(target instanceof io.reactivex.rxjava3.core.Flowable).isTrue();
assertThat(((io.reactivex.rxjava3.core.Flowable<?>) target).toList().blockingGet()).isEqualTo(sequence);
}
@Test
void toObservable() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Publisher<Integer> source = Flux.fromIterable(sequence);
Object target = getAdapter(io.reactivex.rxjava3.core.Observable.class).fromPublisher(source);
assertThat(target instanceof io.reactivex.rxjava3.core.Observable).isTrue();
assertThat(((io.reactivex.rxjava3.core.Observable<?>) target).toList().blockingGet()).isEqualTo(sequence);
}
@Test
void toSingle() {
Publisher<Integer> source = Flux.fromArray(new Integer[] {1});
Object target = getAdapter(io.reactivex.rxjava3.core.Single.class).fromPublisher(source);
assertThat(target instanceof io.reactivex.rxjava3.core.Single).isTrue();
assertThat(((io.reactivex.rxjava3.core.Single<Integer>) target).blockingGet()).isEqualTo(Integer.valueOf(1));
}
@Test
void toCompletable() {
Publisher<Integer> source = Flux.fromArray(new Integer[] {1, 2, 3});
Object target = getAdapter(io.reactivex.rxjava3.core.Completable.class).fromPublisher(source);
assertThat(target instanceof io.reactivex.rxjava3.core.Completable).isTrue();
((io.reactivex.rxjava3.core.Completable) target).blockingAwait();
}
@Test
void fromFlowable() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Object source = io.reactivex.rxjava3.core.Flowable.fromIterable(sequence);
Object target = getAdapter(io.reactivex.rxjava3.core.Flowable.class).toPublisher(source);
assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
}
@Test
void fromObservable() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Object source = io.reactivex.rxjava3.core.Observable.fromIterable(sequence);
Object target = getAdapter(io.reactivex.rxjava3.core.Observable.class).toPublisher(source);
assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
}
@Test
void fromSingle() {
Object source = io.reactivex.rxjava3.core.Single.just(1);
Object target = getAdapter(io.reactivex.rxjava3.core.Single.class).toPublisher(source);
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Mono<Integer>) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
}
@Test
void fromCompletable() {
Object source = io.reactivex.rxjava3.core.Completable.complete();
Object target = getAdapter(io.reactivex.rxjava3.core.Completable.class).toPublisher(source);
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
((Mono<Void>) target).block(Duration.ofMillis(1000));
}
}
@Nested
class Kotlin {
@Test
void defaultAdapterRegistrations() {
// Coroutines
assertThat(getAdapter(Deferred.class)).isNotNull();
}
@Test
void deferred() {
assertThat(getAdapter(CompletableFuture.class).getDescriptor().isDeferred()).isEqualTo(false);
assertThat(getAdapter(Mono.class).getDescriptor().isDeferred()).isEqualTo(true);
assertThat(getAdapter(Flux.class).getDescriptor().isDeferred()).isEqualTo(true);
assertThat(getAdapter(io.reactivex.Completable.class).getDescriptor().isDeferred()).isEqualTo(true);
assertThat(getAdapter(io.reactivex.Single.class).getDescriptor().isDeferred()).isEqualTo(true);
assertThat(getAdapter(io.reactivex.Flowable.class).getDescriptor().isDeferred()).isEqualTo(true);
assertThat(getAdapter(io.reactivex.Observable.class).getDescriptor().isDeferred()).isEqualTo(true);
assertThat(getAdapter(Deferred.class).getDescriptor().isDeferred()).isEqualTo(true);
assertThat(getAdapter(kotlinx.coroutines.flow.Flow.class).getDescriptor().isDeferred()).isEqualTo(true);
}
}
private ReactiveAdapter getAdapter(Class<?> reactiveType) {
ReactiveAdapter adapter = this.registry.getAdapter(reactiveType);

View File

@ -26,7 +26,7 @@ dependencies {
testCompile("org.apache.activemq:activemq-kahadb-store")
testCompile("org.apache.activemq:activemq-stomp")
testCompile("io.projectreactor:reactor-test")
testCompile "io.reactivex.rxjava2:rxjava"
testCompile "io.reactivex.rxjava3:rxjava"
testCompile("org.jetbrains.kotlin:kotlin-reflect")
testCompile("org.jetbrains.kotlin:kotlin-stdlib")
testCompile("org.xmlunit:xmlunit-assertj")

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,7 +17,7 @@ package org.springframework.messaging.handler.invocation.reactive;
import java.util.Collections;
import io.reactivex.Completable;
import io.reactivex.rxjava3.core.Completable;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -24,9 +24,9 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.metadata.WellKnownMimeType;

View File

@ -18,9 +18,7 @@ dependencies {
optional("javax.xml.bind:jaxb-api")
optional("javax.xml.ws:jaxws-api")
optional("org.glassfish.main:javax.jws")
optional("io.reactivex:rxjava")
optional("io.reactivex:rxjava-reactive-streams")
optional("io.reactivex.rxjava2:rxjava")
optional("io.reactivex.rxjava3:rxjava")
optional("io.netty:netty-buffer")
optional("io.netty:netty-handler")
optional("io.netty:netty-codec-http") // Until Netty4ClientHttpRequest is removed

View File

@ -16,8 +16,6 @@ dependencies {
optional("org.freemarker:freemarker")
optional("com.fasterxml.jackson.core:jackson-databind")
optional("com.fasterxml.jackson.dataformat:jackson-dataformat-smile")
optional("io.reactivex:rxjava")
optional("io.reactivex:rxjava-reactive-streams")
optional("io.projectreactor.netty:reactor-netty")
optional("org.apache.tomcat:tomcat-websocket")
optional("org.eclipse.jetty.websocket:websocket-server") {
@ -38,7 +36,7 @@ dependencies {
testCompile("com.fasterxml:aalto-xml")
testCompile("org.hibernate:hibernate-validator")
testCompile("javax.validation:validation-api")
testCompile "io.reactivex.rxjava2:rxjava"
testCompile("io.reactivex.rxjava3:rxjava")
testCompile("io.projectreactor:reactor-test")
testCompile("io.undertow:undertow-core")
testCompile("org.apache.tomcat.embed:tomcat-embed-core")

View File

@ -30,7 +30,7 @@ import java.util.Map;
import java.util.Optional;
import com.fasterxml.jackson.annotation.JsonView;
import io.reactivex.Single;
import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -24,7 +24,7 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import io.reactivex.Single;
import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -21,16 +21,15 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.Single;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapterRegistry;
@ -80,12 +79,10 @@ public class HttpEntityMethodArgumentResolverTests {
testSupports(this.testMethod.arg(httpEntityType(String.class)));
testSupports(this.testMethod.arg(httpEntityType(Mono.class, String.class)));
testSupports(this.testMethod.arg(httpEntityType(Single.class, String.class)));
testSupports(this.testMethod.arg(httpEntityType(io.reactivex.Single.class, String.class)));
testSupports(this.testMethod.arg(httpEntityType(Maybe.class, String.class)));
testSupports(this.testMethod.arg(httpEntityType(CompletableFuture.class, String.class)));
testSupports(this.testMethod.arg(httpEntityType(Flux.class, String.class)));
testSupports(this.testMethod.arg(httpEntityType(Observable.class, String.class)));
testSupports(this.testMethod.arg(httpEntityType(io.reactivex.Observable.class, String.class)));
testSupports(this.testMethod.arg(httpEntityType(Flowable.class, String.class)));
testSupports(this.testMethod.arg(forClassWithGenerics(RequestEntity.class, String.class)));
}
@ -132,17 +129,6 @@ public class HttpEntityMethodArgumentResolverTests {
ResolvableType type = httpEntityType(Single.class, String.class);
HttpEntity<Single<String>> entity = resolveValueWithEmptyBody(type);
StepVerifier.create(RxReactiveStreams.toPublisher(entity.getBody()))
.expectNextCount(0)
.expectError(ServerWebInputException.class)
.verify();
}
@Test
public void emptyBodyWithRxJava2Single() {
ResolvableType type = httpEntityType(io.reactivex.Single.class, String.class);
HttpEntity<io.reactivex.Single<String>> entity = resolveValueWithEmptyBody(type);
StepVerifier.create(entity.getBody().toFlowable())
.expectNextCount(0)
.expectError(ServerWebInputException.class)
@ -150,7 +136,7 @@ public class HttpEntityMethodArgumentResolverTests {
}
@Test
public void emptyBodyWithRxJava2Maybe() {
public void emptyBodyWithMaybe() {
ResolvableType type = httpEntityType(Maybe.class, String.class);
HttpEntity<Maybe<String>> entity = resolveValueWithEmptyBody(type);
@ -165,17 +151,6 @@ public class HttpEntityMethodArgumentResolverTests {
ResolvableType type = httpEntityType(Observable.class, String.class);
HttpEntity<Observable<String>> entity = resolveValueWithEmptyBody(type);
StepVerifier.create(RxReactiveStreams.toPublisher(entity.getBody()))
.expectNextCount(0)
.expectComplete()
.verify();
}
@Test
public void emptyBodyWithRxJava2Observable() {
ResolvableType type = httpEntityType(io.reactivex.Observable.class, String.class);
HttpEntity<io.reactivex.Observable<String>> entity = resolveValueWithEmptyBody(type);
StepVerifier.create(entity.getBody().toFlowable(BackpressureStrategy.BUFFER))
.expectNextCount(0)
.expectComplete()
@ -230,22 +205,12 @@ public class HttpEntityMethodArgumentResolverTests {
ResolvableType type = httpEntityType(Single.class, String.class);
HttpEntity<Single<String>> httpEntity = resolveValue(exchange, type);
assertThat(httpEntity.getHeaders()).isEqualTo(exchange.getRequest().getHeaders());
assertThat(httpEntity.getBody().toBlocking().value()).isEqualTo("line1");
}
@Test
public void httpEntityWithRxJava2SingleBody() {
ServerWebExchange exchange = postExchange("line1");
ResolvableType type = httpEntityType(io.reactivex.Single.class, String.class);
HttpEntity<io.reactivex.Single<String>> httpEntity = resolveValue(exchange, type);
assertThat(httpEntity.getHeaders()).isEqualTo(exchange.getRequest().getHeaders());
assertThat(httpEntity.getBody().blockingGet()).isEqualTo("line1");
}
@Test
public void httpEntityWithRxJava2MaybeBody() {
public void httpEntityWithMaybeBody() {
ServerWebExchange exchange = postExchange("line1");
ResolvableType type = httpEntityType(Maybe.class, String.class);
HttpEntity<Maybe<String>> httpEntity = resolveValue(exchange, type);
@ -335,10 +300,8 @@ public class HttpEntityMethodArgumentResolverTests {
HttpEntity<Mono<String>> monoBody,
HttpEntity<Flux<String>> fluxBody,
HttpEntity<Single<String>> singleBody,
HttpEntity<io.reactivex.Single<String>> rxJava2SingleBody,
HttpEntity<Maybe<String>> rxJava2MaybeBody,
HttpEntity<Maybe<String>> maybeBody,
HttpEntity<Observable<String>> observableBody,
HttpEntity<io.reactivex.Observable<String>> rxJava2ObservableBody,
HttpEntity<Flowable<String>> flowableBody,
HttpEntity<CompletableFuture<String>> completableFutureBody,
RequestEntity<String> requestEntity,

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -29,15 +29,15 @@ import java.util.concurrent.CompletableFuture;
import javax.xml.bind.annotation.XmlRootElement;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import rx.Observable;
import rx.Single;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
@ -140,21 +140,11 @@ public class MessageReaderArgumentResolverTests {
MethodParameter param = this.testMethod.arg(type);
Single<TestBean> single = resolveValue(param, body);
assertThat(single.toBlocking().value()).isEqualTo(new TestBean("f1", "b1"));
}
@Test
public void rxJava2SingleTestBean() throws Exception {
String body = "{\"bar\":\"b1\",\"foo\":\"f1\"}";
ResolvableType type = forClassWithGenerics(io.reactivex.Single.class, TestBean.class);
MethodParameter param = this.testMethod.arg(type);
io.reactivex.Single<TestBean> single = resolveValue(param, body);
assertThat(single.blockingGet()).isEqualTo(new TestBean("f1", "b1"));
}
@Test
public void rxJava2MaybeTestBean() throws Exception {
public void maybeTestBean() throws Exception {
String body = "{\"bar\":\"b1\",\"foo\":\"f1\"}";
ResolvableType type = forClassWithGenerics(Maybe.class, TestBean.class);
MethodParameter param = this.testMethod.arg(type);
@ -170,16 +160,6 @@ public class MessageReaderArgumentResolverTests {
MethodParameter param = this.testMethod.arg(type);
Observable<?> observable = resolveValue(param, body);
assertThat(observable.toList().toBlocking().first()).isEqualTo(Arrays.asList(new TestBean("f1", "b1"), new TestBean("f2", "b2")));
}
@Test
public void rxJava2ObservableTestBean() throws Exception {
String body = "[{\"bar\":\"b1\",\"foo\":\"f1\"},{\"bar\":\"b2\",\"foo\":\"f2\"}]";
ResolvableType type = forClassWithGenerics(io.reactivex.Observable.class, TestBean.class);
MethodParameter param = this.testMethod.arg(type);
io.reactivex.Observable<?> observable = resolveValue(param, body);
assertThat(observable.toList().blockingGet()).isEqualTo(Arrays.asList(new TestBean("f1", "b1"), new TestBean("f2", "b2")));
}
@ -324,10 +304,8 @@ public class MessageReaderArgumentResolverTests {
@Validated Mono<TestBean> monoTestBean,
@Validated Flux<TestBean> fluxTestBean,
Single<TestBean> singleTestBean,
io.reactivex.Single<TestBean> rxJava2SingleTestBean,
Maybe<TestBean> rxJava2MaybeTestBean,
Maybe<TestBean> maybeTestBean,
Observable<TestBean> observableTestBean,
io.reactivex.Observable<TestBean> rxJava2ObservableTestBean,
Flowable<TestBean> flowableTestBean,
CompletableFuture<TestBean> futureTestBean,
TestBean testBean,

View File

@ -27,13 +27,13 @@ import java.util.List;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.reactivex.Flowable;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Observable;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import rx.Completable;
import rx.Observable;
import org.springframework.core.MethodParameter;
import org.springframework.core.codec.ByteBufferEncoder;
@ -117,11 +117,11 @@ public class MessageWriterResultHandlerTests {
testVoid(Completable.complete(), on(TestController.class).resolveReturnType(Completable.class));
testVoid(Observable.empty(), on(TestController.class).resolveReturnType(Observable.class, Void.class));
MethodParameter type = on(TestController.class).resolveReturnType(io.reactivex.Completable.class);
testVoid(io.reactivex.Completable.complete(), type);
MethodParameter type = on(TestController.class).resolveReturnType(Completable.class);
testVoid(Completable.complete(), type);
type = on(TestController.class).resolveReturnType(io.reactivex.Observable.class, Void.class);
testVoid(io.reactivex.Observable.empty(), type);
type = on(TestController.class).resolveReturnType(Observable.class, Void.class);
testVoid(Observable.empty(), type);
type = on(TestController.class).resolveReturnType(Flowable.class, Void.class);
testVoid(Flowable.empty(), type);
@ -274,14 +274,10 @@ public class MessageWriterResultHandlerTests {
Completable completable() { return null; }
io.reactivex.Completable rxJava2Completable() { return null; }
Flux<Void> fluxVoid() { return null; }
Observable<Void> observableVoid() { return null; }
io.reactivex.Observable<Void> rxJava2ObservableVoid() { return null; }
Flowable<Void> flowableVoid() { return null; }
OutputStream outputStream() { return null; }

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -21,12 +21,11 @@ import java.time.Duration;
import java.util.Map;
import java.util.function.Function;
import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import rx.RxReactiveStreams;
import rx.Single;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapterRegistry;
@ -134,7 +133,7 @@ public class ModelAttributeMethodArgumentResolverTests {
testBindFoo("fooSingle", parameter, single -> {
boolean condition = single instanceof Single;
assertThat(condition).as(single.getClass().getName()).isTrue();
Object value = ((Single<?>) single).toBlocking().value();
Object value = ((Single<?>) single).blockingGet();
assertThat(value.getClass()).isEqualTo(Foo.class);
return (Foo) value;
});
@ -257,7 +256,7 @@ public class ModelAttributeMethodArgumentResolverTests {
assertThat(value).isNotNull();
boolean condition = value instanceof Single;
assertThat(condition).isTrue();
return Mono.from(RxReactiveStreams.toPublisher((Single<?>) value));
return Mono.from(((Single<?>) value).toFlowable());
});
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -23,10 +23,10 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import rx.Single;
import org.springframework.context.support.StaticApplicationContext;
import org.springframework.core.MethodIntrospector;
@ -122,7 +122,7 @@ public class ModelInitializerTests {
assertThat(((Mono<TestBean>) value).block(TIMEOUT).getName()).isEqualTo("Mono Bean");
value = model.get("singleBean");
assertThat(((Single<TestBean>) value).toBlocking().value().getName()).isEqualTo("Single Bean");
assertThat(((Single<TestBean>) value).blockingGet().getName()).isEqualTo("Single Bean");
value = model.get("voidMethodBean");
assertThat(((TestBean) value).getName()).isEqualTo("Void Method Bean");

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,7 +18,7 @@ package org.springframework.web.reactive.result.method.annotation;
import java.security.Principal;
import io.reactivex.Single;
import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,7 +19,7 @@ package org.springframework.web.reactive.result.method.annotation;
import java.time.Duration;
import java.util.Optional;
import io.reactivex.Single;
import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -22,15 +22,15 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import io.reactivex.Maybe;
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.Single;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapterRegistry;
@ -150,14 +150,14 @@ public class RequestBodyMethodArgumentResolverTests {
public void emptyBodyWithSingle() {
MethodParameter param = this.testMethod.annot(requestBody()).arg(Single.class, String.class);
Single<String> single = resolveValueWithEmptyBody(param);
StepVerifier.create(RxReactiveStreams.toPublisher(single))
StepVerifier.create(single.toFlowable())
.expectNextCount(0)
.expectError(ServerWebInputException.class)
.verify();
param = this.testMethod.annot(requestBody().notRequired()).arg(Single.class, String.class);
single = resolveValueWithEmptyBody(param);
StepVerifier.create(RxReactiveStreams.toPublisher(single))
StepVerifier.create(single.toFlowable())
.expectNextCount(0)
.expectError(ServerWebInputException.class)
.verify();
@ -184,14 +184,14 @@ public class RequestBodyMethodArgumentResolverTests {
public void emptyBodyWithObservable() {
MethodParameter param = this.testMethod.annot(requestBody()).arg(Observable.class, String.class);
Observable<String> observable = resolveValueWithEmptyBody(param);
StepVerifier.create(RxReactiveStreams.toPublisher(observable))
StepVerifier.create(observable.toFlowable(BackpressureStrategy.BUFFER))
.expectNextCount(0)
.expectError(ServerWebInputException.class)
.verify();
param = this.testMethod.annot(requestBody().notRequired()).arg(Observable.class, String.class);
observable = resolveValueWithEmptyBody(param);
StepVerifier.create(RxReactiveStreams.toPublisher(observable))
StepVerifier.create(observable.toFlowable(BackpressureStrategy.BUFFER))
.expectNextCount(0)
.expectComplete()
.verify();
@ -248,19 +248,15 @@ public class RequestBodyMethodArgumentResolverTests {
@RequestBody Mono<String> mono,
@RequestBody Flux<String> flux,
@RequestBody Single<String> single,
@RequestBody io.reactivex.Single<String> rxJava2Single,
@RequestBody Maybe<String> rxJava2Maybe,
@RequestBody Observable<String> obs,
@RequestBody io.reactivex.Observable<String> rxjava2Obs,
@RequestBody Maybe<String> maybe,
@RequestBody Observable<String> observable,
@RequestBody CompletableFuture<String> future,
@RequestBody(required = false) String stringNotRequired,
@RequestBody(required = false) Mono<String> monoNotRequired,
@RequestBody(required = false) Flux<String> fluxNotRequired,
@RequestBody(required = false) Single<String> singleNotRequired,
@RequestBody(required = false) io.reactivex.Single<String> rxJava2SingleNotRequired,
@RequestBody(required = false) Maybe<String> rxJava2MaybeNotRequired,
@RequestBody(required = false) Observable<String> obsNotRequired,
@RequestBody(required = false) io.reactivex.Observable<String> rxjava2ObsNotRequired,
@RequestBody(required = false) Maybe<String> maybeNotRequired,
@RequestBody(required = false) Observable<String> observableNotRequired,
@RequestBody(required = false) CompletableFuture<String> futureNotRequired,
@RequestBody(required = false) Map<?, ?> mapNotRequired,
String notAnnotated) {}

View File

@ -26,14 +26,14 @@ import java.util.concurrent.CompletableFuture;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Completable;
import rx.Observable;
import rx.Single;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
@ -118,15 +118,7 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
startServer(httpServer);
String expected = "Hello!";
assertThat(performGet("/raw-response/observable", new HttpHeaders(), String.class).getBody()).isEqualTo(expected);
}
@ParameterizedHttpServerTest
public void byteBufferResponseBodyWithRxJava2Observable(HttpServer httpServer) throws Exception {
startServer(httpServer);
String expected = "Hello!";
assertThat(performGet("/raw-response/rxjava2-observable",
assertThat(performGet("/raw-response/observable",
new HttpHeaders(), String.class).getBody()).isEqualTo(expected);
}
@ -317,18 +309,10 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
}
@ParameterizedHttpServerTest
public void personTransformWithRxJava2Single(HttpServer httpServer) throws Exception {
public void personTransformWithMaybe(HttpServer httpServer) throws Exception {
startServer(httpServer);
assertThat(performPost("/person-transform/rxjava2-single", JSON, new Person("Robert"),
JSON, Person.class).getBody()).isEqualTo(new Person("ROBERT"));
}
@ParameterizedHttpServerTest
public void personTransformWithRxJava2Maybe(HttpServer httpServer) throws Exception {
startServer(httpServer);
assertThat(performPost("/person-transform/rxjava2-maybe", JSON, new Person("Robert"),
assertThat(performPost("/person-transform/maybe", JSON, new Person("Robert"),
JSON, Person.class).getBody()).isEqualTo(new Person("ROBERT"));
}
@ -359,15 +343,6 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
assertThat(performPost("/person-transform/observable", JSON, req, JSON, PERSON_LIST).getBody()).isEqualTo(res);
}
@ParameterizedHttpServerTest
public void personTransformWithRxJava2Observable(HttpServer httpServer) throws Exception {
startServer(httpServer);
List<?> req = asList(new Person("Robert"), new Person("Marie"));
List<?> res = asList(new Person("ROBERT"), new Person("MARIE"));
assertThat(performPost("/person-transform/rxjava2-observable", JSON, req, JSON, PERSON_LIST).getBody()).isEqualTo(res);
}
@ParameterizedHttpServerTest
public void personTransformWithFlowable(HttpServer httpServer) throws Exception {
startServer(httpServer);
@ -421,17 +396,6 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
assertThat(getApplicationContext().getBean(PersonCreateController.class).persons.size()).isEqualTo(1);
}
@ParameterizedHttpServerTest
public void personCreateWithRxJava2Single(HttpServer httpServer) throws Exception {
startServer(httpServer);
ResponseEntity<Void> entity = performPost(
"/person-create/rxjava2-single", JSON, new Person("Robert"), null, Void.class);
assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(getApplicationContext().getBean(PersonCreateController.class).persons.size()).isEqualTo(1);
}
@ParameterizedHttpServerTest
public void personCreateWithFluxJson(HttpServer httpServer) throws Exception {
startServer(httpServer);
@ -465,17 +429,6 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
assertThat(getApplicationContext().getBean(PersonCreateController.class).persons.size()).isEqualTo(2);
}
@ParameterizedHttpServerTest
public void personCreateWithRxJava2ObservableJson(HttpServer httpServer) throws Exception {
startServer(httpServer);
ResponseEntity<Void> entity = performPost("/person-create/rxjava2-observable", JSON,
asList(new Person("Robert"), new Person("Marie")), null, Void.class);
assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(getApplicationContext().getBean(PersonCreateController.class).persons.size()).isEqualTo(2);
}
@ParameterizedHttpServerTest
public void personCreateWithObservableXml(HttpServer httpServer) throws Exception {
startServer(httpServer);
@ -487,18 +440,6 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
assertThat(getApplicationContext().getBean(PersonCreateController.class).persons.size()).isEqualTo(2);
}
@ParameterizedHttpServerTest
public void personCreateWithRxJava2ObservableXml(HttpServer httpServer) throws Exception {
startServer(httpServer);
People people = new People(new Person("Robert"), new Person("Marie"));
String url = "/person-create/rxjava2-observable";
ResponseEntity<Void> response = performPost(url, APPLICATION_XML, people, null, Void.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(getApplicationContext().getBean(PersonCreateController.class).persons.size()).isEqualTo(2);
}
@ParameterizedHttpServerTest
public void personCreateWithFlowableJson(HttpServer httpServer) throws Exception {
startServer(httpServer);
@ -567,11 +508,6 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
return Observable.just(ByteBuffer.wrap("Hello!".getBytes()));
}
@GetMapping("/rxjava2-observable")
public io.reactivex.Observable<ByteBuffer> getRxJava2Observable() {
return io.reactivex.Observable.just(ByteBuffer.wrap("Hello!".getBytes()));
}
@GetMapping("/flowable")
public Flowable<ByteBuffer> getFlowable() {
return Flowable.just(ByteBuffer.wrap("Hello!".getBytes()));
@ -670,9 +606,8 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
}
@PostMapping("/completable-future")
public CompletableFuture<Person> transformCompletableFuture(
@RequestBody CompletableFuture<Person> personFuture) {
return personFuture.thenApply(person -> new Person(person.getName().toUpperCase()));
public CompletableFuture<Person> transformCompletableFuture(@RequestBody CompletableFuture<Person> future) {
return future.thenApply(person -> new Person(person.getName().toUpperCase()));
}
@PostMapping("/mono")
@ -685,21 +620,14 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
return personFuture.map(person -> new Person(person.getName().toUpperCase()));
}
@PostMapping("/rxjava2-single")
public io.reactivex.Single<Person> transformRxJava2Single(@RequestBody io.reactivex.Single<Person> personFuture) {
return personFuture.map(person -> new Person(person.getName().toUpperCase()));
}
@PostMapping("/rxjava2-maybe")
public Maybe<Person> transformRxJava2Maybe(@RequestBody Maybe<Person> personFuture) {
@PostMapping("/maybe")
public Maybe<Person> transformMaybe(@RequestBody Maybe<Person> personFuture) {
return personFuture.map(person -> new Person(person.getName().toUpperCase()));
}
@PostMapping("/publisher")
public Publisher<Person> transformPublisher(@RequestBody Publisher<Person> persons) {
return Flux
.from(persons)
.map(person -> new Person(person.getName().toUpperCase()));
return Flux.from(persons).map(person -> new Person(person.getName().toUpperCase()));
}
@PostMapping("/flux")
@ -712,11 +640,6 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
return persons.map(person -> new Person(person.getName().toUpperCase()));
}
@PostMapping("/rxjava2-observable")
public io.reactivex.Observable<Person> transformObservable(@RequestBody io.reactivex.Observable<Person> persons) {
return persons.map(person -> new Person(person.getName().toUpperCase()));
}
@PostMapping("/flowable")
public Flowable<Person> transformFlowable(@RequestBody Flowable<Person> persons) {
return persons.map(person -> new Person(person.getName().toUpperCase()));
@ -743,11 +666,6 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
@PostMapping("/single")
public Completable createWithSingle(@RequestBody Single<Person> single) {
return single.map(persons::add).toCompletable();
}
@PostMapping("/rxjava2-single")
public io.reactivex.Completable createWithRxJava2Single(@RequestBody io.reactivex.Single<Person> single) {
return single.map(persons::add).ignoreElement();
}
@ -757,19 +675,12 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
}
@PostMapping("/observable")
public Observable<Void> createWithObservable(@RequestBody Observable<Person> observable) {
return observable.toList().doOnNext(persons::addAll).flatMap(document -> Observable.empty());
}
@PostMapping("/rxjava2-observable")
public io.reactivex.Completable createWithRxJava2Observable(
@RequestBody io.reactivex.Observable<Person> observable) {
public Completable createWithObservable(@RequestBody Observable<Person> observable) {
return observable.toList().doOnSuccess(persons::addAll).ignoreElement();
}
@PostMapping("/flowable")
public io.reactivex.Completable createWithFlowable(@RequestBody Flowable<Person> flowable) {
public Completable createWithFlowable(@RequestBody Flowable<Person> flowable) {
return flowable.toList().doOnSuccess(persons::addAll).ignoreElement();
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -20,11 +20,11 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import rx.Completable;
import rx.Single;
import org.springframework.core.codec.ByteBufferEncoder;
import org.springframework.core.codec.CharSequenceEncoder;

View File

@ -28,13 +28,13 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import rx.Completable;
import rx.Single;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -15,7 +15,7 @@
*/
package org.springframework.web.reactive.result.method.annotation;
import io.reactivex.Single;
import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -21,8 +21,8 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

View File

@ -26,11 +26,11 @@ import java.util.Locale;
import java.util.Map;
import java.util.TreeMap;
import io.reactivex.rxjava3.core.Completable;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import rx.Completable;
import org.springframework.context.support.StaticApplicationContext;
import org.springframework.core.MethodParameter;

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,7 +19,7 @@ package org.springframework.web.reactive.function.server
import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
import io.reactivex.Flowable
import io.reactivex.rxjava3.core.Flowable
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat

View File

@ -59,9 +59,7 @@ dependencies {
testCompile("org.hibernate:hibernate-validator")
testCompile("javax.validation:validation-api")
testCompile("io.projectreactor:reactor-core")
testCompile("io.reactivex:rxjava")
testCompile("io.reactivex:rxjava-reactive-streams")
testCompile("io.reactivex.rxjava2:rxjava")
testCompile("io.reactivex.rxjava3:rxjava")
testCompile("org.jetbrains.kotlin:kotlin-script-runtime")
testRuntime("org.jetbrains.kotlin:kotlin-scripting-jsr223-embeddable")
testRuntime("org.jruby:jruby")

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -26,14 +26,14 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleEmitter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import rx.Single;
import rx.SingleEmitter;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapterRegistry;
@ -98,7 +98,6 @@ public class ReactiveTypeHandlerTests {
public void supportsType() throws Exception {
assertThat(this.handler.isReactiveType(Mono.class)).isTrue();
assertThat(this.handler.isReactiveType(Single.class)).isTrue();
assertThat(this.handler.isReactiveType(io.reactivex.Single.class)).isTrue();
}
@Test
@ -117,16 +116,10 @@ public class ReactiveTypeHandlerTests {
MonoProcessor<String> monoEmpty = MonoProcessor.create();
testDeferredResultSubscriber(monoEmpty, Mono.class, forClass(String.class), monoEmpty::onComplete, null);
// RxJava 1 Single
AtomicReference<SingleEmitter<String>> ref = new AtomicReference<>();
Single<String> single = Single.fromEmitter(ref::set);
testDeferredResultSubscriber(single, Single.class, forClass(String.class),
() -> ref.get().onSuccess("foo"), "foo");
// RxJava 2 Single
AtomicReference<io.reactivex.SingleEmitter<String>> ref2 = new AtomicReference<>();
io.reactivex.Single<String> single2 = io.reactivex.Single.create(ref2::set);
testDeferredResultSubscriber(single2, io.reactivex.Single.class, forClass(String.class),
// RxJava Single
AtomicReference<SingleEmitter<String>> ref2 = new AtomicReference<>();
Single<String> single2 = Single.create(ref2::set);
testDeferredResultSubscriber(single2, Single.class, forClass(String.class),
() -> ref2.get().onSuccess("foo"), "foo");
}
@ -162,15 +155,10 @@ public class ReactiveTypeHandlerTests {
MonoProcessor<String> mono = MonoProcessor.create();
testDeferredResultSubscriber(mono, Mono.class, forClass(String.class), () -> mono.onError(ex), ex);
// RxJava 1 Single
AtomicReference<SingleEmitter<String>> ref = new AtomicReference<>();
Single<String> single = Single.fromEmitter(ref::set);
testDeferredResultSubscriber(single, Single.class, forClass(String.class), () -> ref.get().onError(ex), ex);
// RxJava 2 Single
AtomicReference<io.reactivex.SingleEmitter<String>> ref2 = new AtomicReference<>();
io.reactivex.Single<String> single2 = io.reactivex.Single.create(ref2::set);
testDeferredResultSubscriber(single2, io.reactivex.Single.class, forClass(String.class),
// RxJava Single
AtomicReference<SingleEmitter<String>> ref2 = new AtomicReference<>();
Single<String> single2 = Single.create(ref2::set);
testDeferredResultSubscriber(single2, Single.class, forClass(String.class),
() -> ref2.get().onError(ex), ex);
}
@ -343,8 +331,6 @@ public class ReactiveTypeHandlerTests {
Single<String> handleSingle() { return null; }
io.reactivex.Single<String> handleSingleRxJava2() { return null; }
Flux<Bar> handleFlux() { return null; }
Flux<String> handleFluxString() { return null; }