Add Mono and Flux support in @RequestMapping handler methods

This commit is contained in:
Sebastien Deleuze 2016-01-11 13:02:36 +01:00
parent 1faeb0ec87
commit ae4b35ced7
2 changed files with 97 additions and 5 deletions

View File

@ -76,7 +76,14 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve
if (this.conversionService.canConvert(Publisher.class, type.getRawClass())) {
return Mono.just(this.conversionService.convert(elementFlux, type.getRawClass()));
}
else if (type.getRawClass() == Flux.class) {
return Mono.just(elementFlux);
}
else if (type.getRawClass() == Mono.class) {
return Mono.just(Mono.from(elementFlux));
}
// TODO Currently manage only "Foo" parameter, not "List<Foo>" parameters, Stéphane is going to add toIterable/toIterator to Flux to support that use case
return elementFlux.next().map(o -> o);
}

View File

@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.Flux;
import reactor.Mono;
import reactor.io.buffer.Buffer;
import reactor.rx.Promise;
@ -61,7 +62,6 @@ import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.reactive.DispatcherHandler;
import org.springframework.web.reactive.handler.SimpleHandlerResultHandler;
import org.springframework.web.server.WebToHttpHandlerAdapter;
import org.springframework.web.server.WebToHttpHandlerBuilder;
import static org.junit.Assert.assertEquals;
@ -113,7 +113,19 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
}
@Test
public void rawHelloResponse() throws Exception {
public void rawFluxResponse() throws Exception {
RestTemplate restTemplate = new RestTemplate();
URI url = new URI("http://localhost:" + port + "/raw-flux");
RequestEntity<Void> request = RequestEntity.get(url).build();
ResponseEntity<String> response = restTemplate.exchange(request, String.class);
assertEquals("Hello!", response.getBody());
}
@Test
public void rawObservableResponse() throws Exception {
RestTemplate restTemplate = new RestTemplate();
@ -158,6 +170,11 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
serializeAsPojo("http://localhost:" + port + "/completable-future");
}
@Test
public void serializeAsMono() throws Exception {
serializeAsPojo("http://localhost:" + port + "/mono");
}
@Test
public void serializeAsSingle() throws Exception {
serializeAsPojo("http://localhost:" + port + "/single");
@ -178,6 +195,11 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
serializeAsCollection("http://localhost:" + port + "/publisher");
}
@Test
public void serializeAsFlux() throws Exception {
serializeAsCollection("http://localhost:" + port + "/flux");
}
@Test
public void serializeAsObservable() throws Exception {
serializeAsCollection("http://localhost:" + port + "/observable");
@ -193,6 +215,11 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
capitalizeCollection("http://localhost:" + port + "/publisher-capitalize");
}
@Test
public void fluxCapitalize() throws Exception {
capitalizeCollection("http://localhost:" + port + "/flux-capitalize");
}
@Test
public void observableCapitalize() throws Exception {
capitalizeCollection("http://localhost:" + port + "/observable-capitalize");
@ -213,6 +240,11 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
capitalizePojo("http://localhost:" + port + "/completable-future-capitalize");
}
@Test
public void monoCapitalize() throws Exception {
capitalizePojo("http://localhost:" + port + "/mono-capitalize");
}
@Test
public void singleCapitalize() throws Exception {
capitalizePojo("http://localhost:" + port + "/single-capitalize");
@ -228,6 +260,11 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
create("http://localhost:" + this.port + "/publisher-create");
}
@Test
public void fluxCreate() throws Exception {
create("http://localhost:" + this.port + "/flux-create");
}
@Test
public void streamCreate() throws Exception {
create("http://localhost:" + this.port + "/stream-create");
@ -384,12 +421,24 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
ResolvableType.forClass(Person.class), MediaType.APPLICATION_JSON);
}
@RequestMapping("/raw-flux")
@ResponseBody
public Flux<ByteBuffer> rawFluxResponseBody() {
return Flux.just(Buffer.wrap("Hello!").byteBuffer());
}
@RequestMapping("/raw-observable")
@ResponseBody
public Observable<ByteBuffer> rawObservableResponseBody() {
return Observable.just(Buffer.wrap("Hello!").byteBuffer());
}
@RequestMapping("/mono")
@ResponseBody
public Mono<Person> monoResponseBody() {
return Mono.just(new Person("Robert"));
}
@RequestMapping("/single")
@ResponseBody
public Single<Person> singleResponseBody() {
@ -414,6 +463,12 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
return Stream.just(new Person("Robert"), new Person("Marie"));
}
@RequestMapping("/flux")
@ResponseBody
public Flux<Person> fluxResponseBody() {
return Flux.just(new Person("Robert"), new Person("Marie"));
}
@RequestMapping("/observable")
@ResponseBody
public Observable<Person> observableResponseBody() {
@ -435,6 +490,15 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
});
}
@RequestMapping("/flux-capitalize")
@ResponseBody
public Flux<Person> fluxCapitalize(@RequestBody Flux<Person> persons) {
return persons.map(person -> {
person.setName(person.getName().toUpperCase());
return person;
});
}
@RequestMapping("/observable-capitalize")
@ResponseBody
public Observable<Person> observableCapitalize(@RequestBody Observable<Person> persons) {
@ -471,6 +535,15 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
});
}
@RequestMapping("/mono-capitalize")
@ResponseBody
public Mono<Person> monoCapitalize(@RequestBody Mono<Person> personFuture) {
return personFuture.map(person -> {
person.setName(person.getName().toUpperCase());
return person;
});
}
@RequestMapping("/single-capitalize")
@ResponseBody
public Single<Person> singleCapitalize(@RequestBody Single<Person> personFuture) {
@ -491,12 +564,17 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
@RequestMapping("/publisher-create")
public Publisher<Void> publisherCreate(@RequestBody Publisher<Person> personStream) {
return Stream.from(personStream).toList().doOnSuccess(persons::addAll).after();
return Flux.from(personStream).doOnNext(persons::add).after();
}
@RequestMapping("/flux-create")
public Mono<Void> fluxCreate(@RequestBody Flux<Person> personStream) {
return personStream.doOnNext(persons::add).after();
}
@RequestMapping("/stream-create")
public Publisher<Void> streamCreate(@RequestBody Stream<Person> personStream) {
return Stream.from(personStream.toList().doOnSuccess(persons::addAll).after()).promise();
return personStream.toList().doOnSuccess(persons::addAll).after();
}
@RequestMapping("/observable-create")
@ -519,7 +597,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
@ExceptionHandler
@ResponseBody
public Publisher<String> handleException(IllegalStateException ex) {
return Stream.just("Recovered from error: " + ex.getMessage());
return Mono.just("Recovered from error: " + ex.getMessage());
}
//TODO add mixed and T request mappings tests
@ -562,6 +640,13 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
public int hashCode() {
return this.name != null ? this.name.hashCode() : 0;
}
@Override
public String toString() {
return "Person{" +
"name='" + name + '\'' +
'}';
}
}
}