Add support for RxJava 2 Maybe type

Issue: SPR-14843
This commit is contained in:
Sebastien Deleuze 2016-10-25 16:41:25 +02:00
parent 20dec61d04
commit 2075932780
6 changed files with 77 additions and 2 deletions

View File

@ -25,6 +25,7 @@ import java.util.function.Predicate;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -43,6 +44,7 @@ import org.springframework.util.ClassUtils;
* registered via {@link #registerFluxAdapter} and {@link #registerMonoAdapter}.
*
* @author Rossen Stoyanchev
* @author Sebastien Deleuze
* @since 5.0
*/
public class ReactiveAdapterRegistry {
@ -296,6 +298,11 @@ public class ReactiveAdapterRegistry {
source -> Flowable.fromPublisher(source).toObservable().singleElement().toSingle(),
new ReactiveAdapter.Descriptor(false, false, false)
);
registry.registerMonoAdapter(Maybe.class,
source -> Mono.from(((Maybe<?>) source).toFlowable()),
source -> Flowable.fromPublisher(source).toObservable().singleElement(),
new ReactiveAdapter.Descriptor(false, true, false)
);
registry.registerMonoAdapter(io.reactivex.Completable.class,
source -> Mono.from(((io.reactivex.Completable) source).toFlowable()),
source -> Flowable.fromPublisher(source).toObservable().ignoreElements(),

View File

@ -18,6 +18,7 @@ package org.springframework.core.convert.support;
import java.util.concurrent.CompletableFuture;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
@ -59,6 +60,7 @@ public class ReactiveAdapterRegistryTests {
testFluxAdapter(Flowable.class);
testFluxAdapter(io.reactivex.Observable.class);
testMonoAdapter(io.reactivex.Single.class);
testMonoAdapter(Maybe.class);
testMonoAdapter(io.reactivex.Completable.class);
}

View File

@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
@ -98,6 +99,7 @@ public class HttpEntityArgumentResolverTests {
testSupports(httpEntityType(forClassWithGenerics(Mono.class, String.class)));
testSupports(httpEntityType(forClassWithGenerics(Single.class, String.class)));
testSupports(httpEntityType(forClassWithGenerics(io.reactivex.Single.class, String.class)));
testSupports(httpEntityType(forClassWithGenerics(Maybe.class, String.class)));
testSupports(httpEntityType(forClassWithGenerics(CompletableFuture.class, String.class)));
testSupports(httpEntityType(forClassWithGenerics(Flux.class, String.class)));
testSupports(httpEntityType(forClassWithGenerics(Observable.class, String.class)));
@ -161,6 +163,17 @@ public class HttpEntityArgumentResolverTests {
.verify(entity.getBody().toFlowable());
}
@Test
public void emptyBodyWithRxJava2Maybe() throws Exception {
ResolvableType type = httpEntityType(forClassWithGenerics(Maybe.class, String.class));
HttpEntity<Maybe<String>> entity = resolveValueWithEmptyBody(type);
ScriptedSubscriber
.create().expectNextCount(0)
.expectComplete()
.verify(entity.getBody().toFlowable());
}
@Test
public void emptyBodyWithObservable() throws Exception {
ResolvableType type = httpEntityType(forClassWithGenerics(Observable.class, String.class));
@ -244,6 +257,16 @@ public class HttpEntityArgumentResolverTests {
assertEquals("line1", httpEntity.getBody().blockingGet());
}
@Test
public void httpEntityWithRxJava2MaybeBody() throws Exception {
String body = "line1";
ResolvableType type = httpEntityType(forClassWithGenerics(Maybe.class, String.class));
HttpEntity<Maybe<String>> httpEntity = resolveValue(type, body);
assertEquals(this.request.getHeaders(), httpEntity.getHeaders());
assertEquals("line1", httpEntity.getBody().blockingGet());
}
@Test
public void httpEntityWithCompletableFutureBody() throws Exception {
String body = "line1";
@ -337,7 +360,8 @@ public class HttpEntityArgumentResolverTests {
HttpEntity<Mono<String>> monoBody,
HttpEntity<Flux<String>> fluxBody,
HttpEntity<Single<String>> singleBody,
HttpEntity<io.reactivex.Single<String>> xJava2SingleBody,
HttpEntity<io.reactivex.Single<String>> rxJava2SingleBody,
HttpEntity<Maybe<String>> rxJava2MaybeBody,
HttpEntity<Observable<String>> observableBody,
HttpEntity<io.reactivex.Observable<String>> rxJava2ObservableBody,
HttpEntity<Flowable<String>> flowableBody,

View File

@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture;
import javax.xml.bind.annotation.XmlRootElement;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
@ -159,6 +160,16 @@ public class MessageReaderArgumentResolverTests {
assertEquals(new TestBean("f1", "b1"), single.blockingGet());
}
@Test
public void rxJava2MaybeTestBean() throws Exception {
String body = "{\"bar\":\"b1\",\"foo\":\"f1\"}";
ResolvableType type = forClassWithGenerics(Maybe.class, TestBean.class);
MethodParameter param = this.testMethod.resolveParam(type);
Maybe<TestBean> maybe = resolveValue(param, body);
assertEquals(new TestBean("f1", "b1"), maybe.blockingGet());
}
@Test
public void observableTestBean() throws Exception {
String body = "[{\"bar\":\"b1\",\"foo\":\"f1\"},{\"bar\":\"b2\",\"foo\":\"f2\"}]";
@ -316,6 +327,7 @@ public class MessageReaderArgumentResolverTests {
@Validated Flux<TestBean> fluxTestBean,
Single<TestBean> singleTestBean,
io.reactivex.Single<TestBean> rxJava2SingleTestBean,
Maybe<TestBean> rxJava2MaybeTestBean,
Observable<TestBean> observableTestBean,
io.reactivex.Observable<TestBean> rxJava2ObservableTestBean,
Flowable<TestBean> flowableTestBean,

View File

@ -22,12 +22,12 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import io.reactivex.Maybe;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.subscriber.ScriptedSubscriber;
import rx.Completable;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.Single;
@ -162,6 +162,21 @@ public class RequestBodyArgumentResolverTests {
.verify(RxReactiveStreams.toPublisher(single));
}
@Test
public void emptyBodyWithMaybe() throws Exception {
ResolvableType type = forClassWithGenerics(Maybe.class, String.class);
Maybe<String> maybe = resolveValueWithEmptyBody(type, true);
ScriptedSubscriber.<String>create().expectNextCount(0)
.expectError(ServerWebInputException.class)
.verify(maybe.toFlowable());
maybe = resolveValueWithEmptyBody(type, false);
ScriptedSubscriber.<String>create().expectNextCount(0)
.expectComplete()
.verify(maybe.toFlowable());
}
@Test
public void emptyBodyWithObservable() throws Exception {
ResolvableType type = forClassWithGenerics(Observable.class, String.class);
@ -239,6 +254,7 @@ public class RequestBodyArgumentResolverTests {
@RequestBody Flux<String> flux,
@RequestBody Single<String> single,
@RequestBody io.reactivex.Single<String> rxJava2Single,
@RequestBody Maybe<String> rxJava2Maybe,
@RequestBody Observable<String> obs,
@RequestBody io.reactivex.Observable<String> rxjava2Obs,
@RequestBody CompletableFuture<String> future,
@ -247,6 +263,7 @@ public class RequestBodyArgumentResolverTests {
@RequestBody(required = false) Flux<String> fluxNotRequired,
@RequestBody(required = false) Single<String> singleNotRequired,
@RequestBody(required = false) io.reactivex.Single<String> rxJava2SingleNotRequired,
@RequestBody(required = false) Maybe<String> rxJava2MaybeNotRequired,
@RequestBody(required = false) Observable<String> obsNotRequired,
@RequestBody(required = false) io.reactivex.Observable<String> rxjava2ObsNotRequired,
@RequestBody(required = false) CompletableFuture<String> futureNotRequired,

View File

@ -26,6 +26,7 @@ import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
@ -218,6 +219,13 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
JSON, Person.class).getBody());
}
@Test
public void personTransformWithRxJava2Maybe() throws Exception {
assertEquals(new Person("ROBERT"),
performPost("/person-transform/rxjava2-maybe", JSON, new Person("Robert"),
JSON, Person.class).getBody());
}
@Test
public void personTransformWithPublisher() throws Exception {
List<?> req = asList(new Person("Robert"), new Person("Marie"));
@ -509,6 +517,11 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
return personFuture.map(person -> new Person(person.getName().toUpperCase()));
}
@PostMapping("/rxjava2-maybe")
public Maybe<Person> transformRxJava2Maybe(@RequestBody Maybe<Person> personFuture) {
return personFuture.map(person -> new Person(person.getName().toUpperCase()));
}
@PostMapping("/publisher")
public Publisher<Person> transformPublisher(@RequestBody Publisher<Person> persons) {
return Flux