Add support for RxJava 2
This commit adds support for RxJava 2 Completable, Single, Observable and Flowable types (io.reactivex package). Issue: SPR-14628
This commit is contained in:
parent
b4641b2306
commit
48d67a245b
|
@ -80,6 +80,7 @@ configure(allprojects) { project ->
|
|||
ext.reactorNettyVersion = '0.6.0.BUILD-SNAPSHOT'
|
||||
ext.romeVersion = "1.6.0"
|
||||
ext.rxjavaVersion = '1.1.9'
|
||||
ext.rxjava2Version = '2.0.0-RC1'
|
||||
ext.rxnettyVersion = '0.5.2-rc.3'
|
||||
ext.servletVersion = "3.1.0"
|
||||
ext.slf4jVersion = "1.7.21"
|
||||
|
@ -389,6 +390,7 @@ project("spring-core") {
|
|||
optional("org.reactivestreams:reactive-streams:${reactivestreamsVersion}")
|
||||
optional("io.projectreactor:reactor-core:${reactorCoreVersion}")
|
||||
optional "io.reactivex:rxjava:${rxjavaVersion}"
|
||||
optional "io.reactivex.rxjava2:rxjava:${rxjava2Version}"
|
||||
optional("io.netty:netty-buffer:${nettyVersion}")
|
||||
testCompile("javax.xml.bind:jaxb-api:${jaxbVersion}")
|
||||
testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}")
|
||||
|
@ -721,6 +723,7 @@ project("spring-web") {
|
|||
exclude group: 'io.reactivex', module: 'rxjava'
|
||||
}
|
||||
optional("io.reactivex:rxjava:${rxjavaVersion}")
|
||||
optional "io.reactivex.rxjava2:rxjava:${rxjava2Version}"
|
||||
optional("io.undertow:undertow-core:${undertowVersion}")
|
||||
optional("org.jboss.xnio:xnio-api:${xnioVersion}")
|
||||
optional("io.netty:netty-buffer:${nettyVersion}") // Temporarily for JsonObjectDecoder
|
||||
|
@ -797,6 +800,7 @@ project("spring-web-reactive") {
|
|||
exclude group: 'io.reactivex', module: 'rxjava'
|
||||
}
|
||||
testCompile("io.reactivex:rxjava:${rxjavaVersion}")
|
||||
testCompile "io.reactivex.rxjava2:rxjava:${rxjava2Version}"
|
||||
testCompile("io.undertow:undertow-core:${undertowVersion}")
|
||||
testCompile("org.jboss.xnio:xnio-api:${xnioVersion}")
|
||||
testCompile("com.fasterxml:aalto-xml:1.0.0")
|
||||
|
|
|
@ -23,8 +23,11 @@ import java.util.concurrent.CompletableFuture;
|
|||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import io.reactivex.BackpressureStrategy;
|
||||
import io.reactivex.Flowable;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.adapter.RxJava1Adapter;
|
||||
import reactor.adapter.RxJava2Adapter;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import rx.Completable;
|
||||
|
@ -48,6 +51,9 @@ public class ReactiveAdapterRegistry {
|
|||
private static final boolean rxJava1Present =
|
||||
ClassUtils.isPresent("rx.Observable", ReactiveAdapterRegistry.class.getClassLoader());
|
||||
|
||||
private static final boolean rxJava2Present =
|
||||
ClassUtils.isPresent("io.reactivex.Flowable", ReactiveAdapterRegistry.class.getClassLoader());
|
||||
|
||||
private final Map<Class<?>, ReactiveAdapter> adapterMap = new LinkedHashMap<>(4);
|
||||
|
||||
|
||||
|
@ -72,6 +78,9 @@ public class ReactiveAdapterRegistry {
|
|||
if (rxJava1Present) {
|
||||
new RxJava1AdapterRegistrar().register(this);
|
||||
}
|
||||
if (rxJava2Present) {
|
||||
new RxJava2AdapterRegistrar().register(this);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -269,4 +278,28 @@ public class ReactiveAdapterRegistry {
|
|||
}
|
||||
}
|
||||
|
||||
private static class RxJava2AdapterRegistrar {
|
||||
|
||||
public void register(ReactiveAdapterRegistry registry) {
|
||||
registry.registerFluxAdapter(Flowable.class,
|
||||
source -> RxJava2Adapter.flowableToFlux((Flowable<?>) source),
|
||||
RxJava2Adapter::fluxToFlowable
|
||||
);
|
||||
registry.registerFluxAdapter(io.reactivex.Observable.class,
|
||||
source -> RxJava2Adapter.observableToFlux((io.reactivex.Observable<?>) source, BackpressureStrategy.BUFFER),
|
||||
RxJava2Adapter::fluxToObservable
|
||||
);
|
||||
registry.registerMonoAdapter(io.reactivex.Single.class,
|
||||
source -> RxJava2Adapter.singleToMono((io.reactivex.Single<?>) source),
|
||||
RxJava2Adapter::monoToSingle,
|
||||
new ReactiveAdapter.Descriptor(false, false, false)
|
||||
);
|
||||
registry.registerMonoAdapter(io.reactivex.Completable.class,
|
||||
source -> RxJava2Adapter.completableToMono((io.reactivex.Completable) source),
|
||||
RxJava2Adapter::monoToCompletable,
|
||||
new ReactiveAdapter.Descriptor(false, true, true)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ package org.springframework.core.convert.support;
|
|||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import io.reactivex.Flowable;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.reactivestreams.Publisher;
|
||||
|
@ -55,6 +56,10 @@ public class ReactiveAdapterRegistryTests {
|
|||
testFluxAdapter(Observable.class);
|
||||
testMonoAdapter(Single.class);
|
||||
testMonoAdapter(Completable.class);
|
||||
testFluxAdapter(Flowable.class);
|
||||
testFluxAdapter(io.reactivex.Observable.class);
|
||||
testMonoAdapter(io.reactivex.Single.class);
|
||||
testMonoAdapter(io.reactivex.Completable.class);
|
||||
}
|
||||
|
||||
private void testFluxAdapter(Class<?> adapteeType) {
|
||||
|
|
|
@ -23,9 +23,12 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import io.reactivex.BackpressureStrategy;
|
||||
import io.reactivex.Flowable;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import reactor.adapter.RxJava1Adapter;
|
||||
import reactor.adapter.RxJava2Adapter;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import rx.Observable;
|
||||
|
@ -67,6 +70,7 @@ import static org.springframework.core.ResolvableType.forClassWithGenerics;
|
|||
* {@link MessageReaderArgumentResolverTests}.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @author Sebastien Deleuze
|
||||
*/
|
||||
public class HttpEntityArgumentResolverTests {
|
||||
|
||||
|
@ -98,9 +102,12 @@ public class HttpEntityArgumentResolverTests {
|
|||
testSupports(httpEntityType(String.class));
|
||||
testSupports(httpEntityType(forClassWithGenerics(Mono.class, String.class)));
|
||||
testSupports(httpEntityType(forClassWithGenerics(Single.class, String.class)));
|
||||
testSupports(httpEntityType(forClassWithGenerics(io.reactivex.Single.class, String.class)));
|
||||
testSupports(httpEntityType(forClassWithGenerics(CompletableFuture.class, String.class)));
|
||||
testSupports(httpEntityType(forClassWithGenerics(Flux.class, String.class)));
|
||||
testSupports(httpEntityType(forClassWithGenerics(Observable.class, String.class)));
|
||||
testSupports(httpEntityType(forClassWithGenerics(io.reactivex.Observable.class, String.class)));
|
||||
testSupports(httpEntityType(forClassWithGenerics(Flowable.class, String.class)));
|
||||
testSupports(forClassWithGenerics(RequestEntity.class, String.class));
|
||||
}
|
||||
|
||||
|
@ -153,6 +160,16 @@ public class HttpEntityArgumentResolverTests {
|
|||
.assertError(ServerWebInputException.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void emptyBodyWithRxJava2Single() throws Exception {
|
||||
ResolvableType type = httpEntityType(forClassWithGenerics(io.reactivex.Single.class, String.class));
|
||||
HttpEntity<io.reactivex.Single<String>> entity = resolveValueWithEmptyBody(type);
|
||||
|
||||
TestSubscriber.subscribe(RxJava2Adapter.singleToMono(entity.getBody()))
|
||||
.assertNoValues()
|
||||
.assertError(ServerWebInputException.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void emptyBodyWithObservable() throws Exception {
|
||||
ResolvableType type = httpEntityType(forClassWithGenerics(Observable.class, String.class));
|
||||
|
@ -164,6 +181,28 @@ public class HttpEntityArgumentResolverTests {
|
|||
.assertNoValues();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void emptyBodyWithRxJava2Observable() throws Exception {
|
||||
ResolvableType type = httpEntityType(forClassWithGenerics(io.reactivex.Observable.class, String.class));
|
||||
HttpEntity<io.reactivex.Observable<String>> entity = resolveValueWithEmptyBody(type);
|
||||
|
||||
TestSubscriber.subscribe(RxJava2Adapter.observableToFlux(entity.getBody(), BackpressureStrategy.BUFFER))
|
||||
.assertNoError()
|
||||
.assertComplete()
|
||||
.assertNoValues();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void emptyBodyWithFlowable() throws Exception {
|
||||
ResolvableType type = httpEntityType(forClassWithGenerics(Flowable.class, String.class));
|
||||
HttpEntity<Flowable<String>> entity = resolveValueWithEmptyBody(type);
|
||||
|
||||
TestSubscriber.subscribe(RxJava2Adapter.flowableToFlux(entity.getBody()))
|
||||
.assertNoError()
|
||||
.assertComplete()
|
||||
.assertNoValues();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void emptyBodyWithCompletableFuture() throws Exception {
|
||||
ResolvableType type = httpEntityType(forClassWithGenerics(CompletableFuture.class, String.class));
|
||||
|
@ -205,6 +244,16 @@ public class HttpEntityArgumentResolverTests {
|
|||
assertEquals("line1", httpEntity.getBody().toBlocking().value());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void httpEntityWithRxJava2SingleBody() throws Exception {
|
||||
String body = "line1";
|
||||
ResolvableType type = httpEntityType(forClassWithGenerics(io.reactivex.Single.class, String.class));
|
||||
HttpEntity<io.reactivex.Single<String>> httpEntity = resolveValue(type, body);
|
||||
|
||||
assertEquals(this.request.getHeaders(), httpEntity.getHeaders());
|
||||
assertEquals("line1", httpEntity.getBody().blockingGet());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void httpEntityWithCompletableFutureBody() throws Exception {
|
||||
String body = "line1";
|
||||
|
@ -295,7 +344,10 @@ public class HttpEntityArgumentResolverTests {
|
|||
HttpEntity<Mono<String>> monoBody,
|
||||
HttpEntity<Flux<String>> fluxBody,
|
||||
HttpEntity<Single<String>> singleBody,
|
||||
HttpEntity<io.reactivex.Single<String>> xJava2SingleBody,
|
||||
HttpEntity<Observable<String>> observableBody,
|
||||
HttpEntity<io.reactivex.Observable<String>> rxJava2ObservableBody,
|
||||
HttpEntity<Flowable<String>> flowableBody,
|
||||
HttpEntity<CompletableFuture<String>> completableFutureBody,
|
||||
RequestEntity<String> requestEntity) {}
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.CompletableFuture;
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
|
||||
import io.reactivex.Flowable;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
@ -144,6 +145,16 @@ public class MessageReaderArgumentResolverTests {
|
|||
assertEquals(new TestBean("f1", "b1"), single.toBlocking().value());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void rxJava2SingleTestBean() throws Exception {
|
||||
String body = "{\"bar\":\"b1\",\"foo\":\"f1\"}";
|
||||
ResolvableType type = forClassWithGenerics(io.reactivex.Single.class, TestBean.class);
|
||||
MethodParameter param = this.testMethod.resolveParam(type);
|
||||
io.reactivex.Single<TestBean> single = resolveValue(param, body);
|
||||
|
||||
assertEquals(new TestBean("f1", "b1"), single.blockingGet());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void observableTestBean() throws Exception {
|
||||
String body = "[{\"bar\":\"b1\",\"foo\":\"f1\"},{\"bar\":\"b2\",\"foo\":\"f2\"}]";
|
||||
|
@ -155,6 +166,28 @@ public class MessageReaderArgumentResolverTests {
|
|||
observable.toList().toBlocking().first());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void rxJava2ObservableTestBean() throws Exception {
|
||||
String body = "[{\"bar\":\"b1\",\"foo\":\"f1\"},{\"bar\":\"b2\",\"foo\":\"f2\"}]";
|
||||
ResolvableType type = forClassWithGenerics(io.reactivex.Observable.class, TestBean.class);
|
||||
MethodParameter param = this.testMethod.resolveParam(type);
|
||||
io.reactivex.Observable<?> observable = resolveValue(param, body);
|
||||
|
||||
assertEquals(Arrays.asList(new TestBean("f1", "b1"), new TestBean("f2", "b2")),
|
||||
observable.toList().blockingFirst());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void flowableTestBean() throws Exception {
|
||||
String body = "[{\"bar\":\"b1\",\"foo\":\"f1\"},{\"bar\":\"b2\",\"foo\":\"f2\"}]";
|
||||
ResolvableType type = forClassWithGenerics(Flowable.class, TestBean.class);
|
||||
MethodParameter param = this.testMethod.resolveParam(type);
|
||||
Flowable<?> flowable = resolveValue(param, body);
|
||||
|
||||
assertEquals(Arrays.asList(new TestBean("f1", "b1"), new TestBean("f2", "b2")),
|
||||
flowable.toList().blockingFirst());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void futureTestBean() throws Exception {
|
||||
String body = "{\"bar\":\"b1\",\"foo\":\"f1\"}";
|
||||
|
@ -288,7 +321,10 @@ public class MessageReaderArgumentResolverTests {
|
|||
@Validated Mono<TestBean> monoTestBean,
|
||||
@Validated Flux<TestBean> fluxTestBean,
|
||||
Single<TestBean> singleTestBean,
|
||||
io.reactivex.Single<TestBean> rxJava2SingleTestBean,
|
||||
Observable<TestBean> observableTestBean,
|
||||
io.reactivex.Observable<TestBean> rxJava2ObservableTestBean,
|
||||
Flowable<TestBean> flowableTestBean,
|
||||
CompletableFuture<TestBean> futureTestBean,
|
||||
TestBean testBean,
|
||||
Map<String, String> map,
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.util.List;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import io.reactivex.Flowable;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
@ -112,8 +113,11 @@ public class MessageWriterResultHandlerTests {
|
|||
testVoidReturnType(null, ResolvableType.forType(void.class));
|
||||
testVoidReturnType(Mono.empty(), ResolvableType.forClassWithGenerics(Mono.class, Void.class));
|
||||
testVoidReturnType(Completable.complete(), ResolvableType.forClass(Completable.class));
|
||||
testVoidReturnType(io.reactivex.Completable.complete(), ResolvableType.forClass(io.reactivex.Completable.class));
|
||||
testVoidReturnType(Flux.empty(), ResolvableType.forClassWithGenerics(Flux.class, Void.class));
|
||||
testVoidReturnType(Observable.empty(), ResolvableType.forClassWithGenerics(Observable.class, Void.class));
|
||||
testVoidReturnType(io.reactivex.Observable.empty(), ResolvableType.forClassWithGenerics(io.reactivex.Observable.class, Void.class));
|
||||
testVoidReturnType(Flowable.empty(), ResolvableType.forClassWithGenerics(Flowable.class, Void.class));
|
||||
}
|
||||
|
||||
private void testVoidReturnType(Object body, ResolvableType type) {
|
||||
|
@ -273,10 +277,16 @@ public class MessageWriterResultHandlerTests {
|
|||
|
||||
Completable completable() { return null; }
|
||||
|
||||
io.reactivex.Completable rxJava2Completable() { return null; }
|
||||
|
||||
Flux<Void> fluxVoid() { return null; }
|
||||
|
||||
Observable<Void> observableVoid() { return null; }
|
||||
|
||||
io.reactivex.Observable<Void> rxJava2ObservableVoid() { return null; }
|
||||
|
||||
Flowable<Void> flowableVoid() { return null; }
|
||||
|
||||
OutputStream outputStream() { return null; }
|
||||
|
||||
List<ParentClass> listParentClass() { return null; }
|
||||
|
|
|
@ -250,13 +250,17 @@ public class RequestBodyArgumentResolverTests {
|
|||
@RequestBody Mono<String> mono,
|
||||
@RequestBody Flux<String> flux,
|
||||
@RequestBody Single<String> single,
|
||||
@RequestBody io.reactivex.Single<String> rxJava2Single,
|
||||
@RequestBody Observable<String> obs,
|
||||
@RequestBody io.reactivex.Observable<String> rxjava2Obs,
|
||||
@RequestBody CompletableFuture<String> future,
|
||||
@RequestBody(required = false) String stringNotRequired,
|
||||
@RequestBody(required = false) Mono<String> monoNotRequired,
|
||||
@RequestBody(required = false) Flux<String> fluxNotRequired,
|
||||
@RequestBody(required = false) Single<String> singleNotRequired,
|
||||
@RequestBody(required = false) io.reactivex.Single<String> rxJava2SingleNotRequired,
|
||||
@RequestBody(required = false) Observable<String> obsNotRequired,
|
||||
@RequestBody(required = false) io.reactivex.Observable<String> rxjava2ObsNotRequired,
|
||||
@RequestBody(required = false) CompletableFuture<String> futureNotRequired,
|
||||
String notAnnotated) {}
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
|
|||
import javax.xml.bind.annotation.XmlElement;
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
|
||||
import io.reactivex.Flowable;
|
||||
import org.junit.Test;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
@ -103,6 +104,18 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
|
|||
assertEquals(expected, performGet("/raw-response/observable", null, String.class).getBody());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void byteBufferResponseBodyWithRxJava2Observable() throws Exception {
|
||||
String expected = "Hello!";
|
||||
assertEquals(expected, performGet("/raw-response/rxjava2-observable", null, String.class).getBody());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void byteBufferResponseBodyWithFlowable() throws Exception {
|
||||
String expected = "Hello!";
|
||||
assertEquals(expected, performGet("/raw-response/flowable", null, String.class).getBody());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void personResponseBody() throws Exception {
|
||||
Person expected = new Person("Robert");
|
||||
|
@ -196,6 +209,13 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
|
|||
JSON, Person.class).getBody());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void personTransformWithRxJava2Single() throws Exception {
|
||||
assertEquals(new Person("ROBERT"),
|
||||
performPost("/person-transform/rxjava2-single", JSON, new Person("Robert"),
|
||||
JSON, Person.class).getBody());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void personTransformWithPublisher() throws Exception {
|
||||
List<?> req = asList(new Person("Robert"), new Person("Marie"));
|
||||
|
@ -217,6 +237,20 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
|
|||
assertEquals(res, performPost("/person-transform/observable", JSON, req, JSON, PERSON_LIST).getBody());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void personTransformWithRxJava2Observable() throws Exception {
|
||||
List<?> req = asList(new Person("Robert"), new Person("Marie"));
|
||||
List<?> res = asList(new Person("ROBERT"), new Person("MARIE"));
|
||||
assertEquals(res, performPost("/person-transform/rxjava2-observable", JSON, req, JSON, PERSON_LIST).getBody());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void personTransformWithFlowable() throws Exception {
|
||||
List<?> req = asList(new Person("Robert"), new Person("Marie"));
|
||||
List<?> res = asList(new Person("ROBERT"), new Person("MARIE"));
|
||||
assertEquals(res, performPost("/person-transform/flowable", JSON, req, JSON, PERSON_LIST).getBody());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void personCreateWithPublisherJson() throws Exception {
|
||||
ResponseEntity<Void> entity = performPost("/person-create/publisher", JSON,
|
||||
|
@ -253,6 +287,15 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
|
|||
assertEquals(1, getApplicationContext().getBean(PersonCreateController.class).persons.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void personCreateWithRxJava2Single() throws Exception {
|
||||
ResponseEntity<Void> entity = performPost(
|
||||
"/person-create/rxjava2-single", JSON, new Person("Robert"), null, Void.class);
|
||||
|
||||
assertEquals(HttpStatus.OK, entity.getStatusCode());
|
||||
assertEquals(1, getApplicationContext().getBean(PersonCreateController.class).persons.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void personCreateWithFluxJson() throws Exception {
|
||||
ResponseEntity<Void> entity = performPost("/person-create/flux", JSON,
|
||||
|
@ -280,6 +323,15 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
|
|||
assertEquals(2, getApplicationContext().getBean(PersonCreateController.class).persons.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void personCreateWithRxJava2ObservableJson() throws Exception {
|
||||
ResponseEntity<Void> entity = performPost("/person-create/rxjava2-observable", JSON,
|
||||
asList(new Person("Robert"), new Person("Marie")), null, Void.class);
|
||||
|
||||
assertEquals(HttpStatus.OK, entity.getStatusCode());
|
||||
assertEquals(2, getApplicationContext().getBean(PersonCreateController.class).persons.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void personCreateWithObservableXml() throws Exception {
|
||||
People people = new People(new Person("Robert"), new Person("Marie"));
|
||||
|
@ -289,6 +341,33 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
|
|||
assertEquals(2, getApplicationContext().getBean(PersonCreateController.class).persons.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void personCreateWithRxJava2ObservableXml() throws Exception {
|
||||
People people = new People(new Person("Robert"), new Person("Marie"));
|
||||
ResponseEntity<Void> response = performPost("/person-create/rxjava2-observable", APPLICATION_XML, people, null, Void.class);
|
||||
|
||||
assertEquals(HttpStatus.OK, response.getStatusCode());
|
||||
assertEquals(2, getApplicationContext().getBean(PersonCreateController.class).persons.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void personCreateWithFlowableJson() throws Exception {
|
||||
ResponseEntity<Void> entity = performPost("/person-create/flowable", JSON,
|
||||
asList(new Person("Robert"), new Person("Marie")), null, Void.class);
|
||||
|
||||
assertEquals(HttpStatus.OK, entity.getStatusCode());
|
||||
assertEquals(2, getApplicationContext().getBean(PersonCreateController.class).persons.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void personCreateWithFlowableXml() throws Exception {
|
||||
People people = new People(new Person("Robert"), new Person("Marie"));
|
||||
ResponseEntity<Void> response = performPost("/person-create/flowable", APPLICATION_XML, people, null, Void.class);
|
||||
|
||||
assertEquals(HttpStatus.OK, response.getStatusCode());
|
||||
assertEquals(2, getApplicationContext().getBean(PersonCreateController.class).persons.size());
|
||||
}
|
||||
|
||||
|
||||
@Configuration
|
||||
@ComponentScan(resourcePattern = "**/RequestMappingMessageConversionIntegrationTests$*.class")
|
||||
|
@ -319,6 +398,16 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
|
|||
public Observable<ByteBuffer> getObservable() {
|
||||
return Observable.just(ByteBuffer.wrap("Hello!".getBytes()));
|
||||
}
|
||||
|
||||
@GetMapping("/rxjava2-observable")
|
||||
public io.reactivex.Observable<ByteBuffer> getRxJava2Observable() {
|
||||
return io.reactivex.Observable.just(ByteBuffer.wrap("Hello!".getBytes()));
|
||||
}
|
||||
|
||||
@GetMapping("/flowable")
|
||||
public Flowable<ByteBuffer> getFlowable() {
|
||||
return Flowable.just(ByteBuffer.wrap("Hello!".getBytes()));
|
||||
}
|
||||
}
|
||||
|
||||
@RestController
|
||||
|
@ -409,6 +498,11 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
|
|||
return personFuture.map(person -> new Person(person.getName().toUpperCase()));
|
||||
}
|
||||
|
||||
@PostMapping("/rxjava2-single")
|
||||
public io.reactivex.Single<Person> transformRxJava2Single(@RequestBody io.reactivex.Single<Person> personFuture) {
|
||||
return personFuture.map(person -> new Person(person.getName().toUpperCase()));
|
||||
}
|
||||
|
||||
@PostMapping("/publisher")
|
||||
public Publisher<Person> transformPublisher(@RequestBody Publisher<Person> persons) {
|
||||
return Flux
|
||||
|
@ -425,6 +519,16 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
|
|||
public Observable<Person> transformObservable(@RequestBody Observable<Person> persons) {
|
||||
return persons.map(person -> new Person(person.getName().toUpperCase()));
|
||||
}
|
||||
|
||||
@PostMapping("/rxjava2-observable")
|
||||
public io.reactivex.Observable<Person> transformObservable(@RequestBody io.reactivex.Observable<Person> persons) {
|
||||
return persons.map(person -> new Person(person.getName().toUpperCase()));
|
||||
}
|
||||
|
||||
@PostMapping("/flowable")
|
||||
public Flowable<Person> transformFlowable(@RequestBody Flowable<Person> persons) {
|
||||
return persons.map(person -> new Person(person.getName().toUpperCase()));
|
||||
}
|
||||
}
|
||||
|
||||
@RestController
|
||||
|
@ -449,6 +553,11 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
|
|||
return single.map(persons::add).toCompletable();
|
||||
}
|
||||
|
||||
@PostMapping("/rxjava2-single")
|
||||
public io.reactivex.Completable createWithRxJava2Single(@RequestBody io.reactivex.Single<Person> single) {
|
||||
return single.map(persons::add).toCompletable();
|
||||
}
|
||||
|
||||
@PostMapping("/flux")
|
||||
public Mono<Void> createWithFlux(@RequestBody Flux<Person> flux) {
|
||||
return flux.doOnNext(persons::add).then();
|
||||
|
@ -458,6 +567,16 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
|
|||
public Observable<Void> createWithObservable(@RequestBody Observable<Person> observable) {
|
||||
return observable.toList().doOnNext(persons::addAll).flatMap(document -> Observable.empty());
|
||||
}
|
||||
|
||||
@PostMapping("/rxjava2-observable")
|
||||
public io.reactivex.Observable<Void> createWithRxJava2Observable(@RequestBody io.reactivex.Observable<Person> observable) {
|
||||
return observable.toList().doOnNext(persons::addAll).flatMap(document -> io.reactivex.Observable.empty());
|
||||
}
|
||||
|
||||
@PostMapping("/flowable")
|
||||
public Flowable<Void> createWithFlowable(@RequestBody Flowable<Person> flowable) {
|
||||
return flowable.toList().doOnNext(persons::addAll).flatMap(document -> Flowable.empty());
|
||||
}
|
||||
}
|
||||
|
||||
@XmlRootElement @SuppressWarnings("WeakerAccess")
|
||||
|
|
Loading…
Reference in New Issue