From ae4b35ced75e8e2882640d5645d482a77a38d19d Mon Sep 17 00:00:00 2001 From: Sebastien Deleuze Date: Mon, 11 Jan 2016 13:02:36 +0100 Subject: [PATCH] Add Mono and Flux support in @RequestMapping handler methods --- .../RequestBodyArgumentResolver.java | 7 ++ .../RequestMappingIntegrationTests.java | 95 ++++++++++++++++++- 2 files changed, 97 insertions(+), 5 deletions(-) diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestBodyArgumentResolver.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestBodyArgumentResolver.java index 3e5c01306b..2ad7e92b1e 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestBodyArgumentResolver.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestBodyArgumentResolver.java @@ -76,7 +76,14 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve if (this.conversionService.canConvert(Publisher.class, type.getRawClass())) { return Mono.just(this.conversionService.convert(elementFlux, type.getRawClass())); } + else if (type.getRawClass() == Flux.class) { + return Mono.just(elementFlux); + } + else if (type.getRawClass() == Mono.class) { + return Mono.just(Mono.from(elementFlux)); + } + // TODO Currently manage only "Foo" parameter, not "List" parameters, Stéphane is going to add toIterable/toIterator to Flux to support that use case return elementFlux.next().map(o -> o); } diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingIntegrationTests.java index 1175641212..1de455fe7c 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingIntegrationTests.java @@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture; import org.junit.Test; import org.reactivestreams.Publisher; +import reactor.Flux; import reactor.Mono; import reactor.io.buffer.Buffer; import reactor.rx.Promise; @@ -61,7 +62,6 @@ import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.client.RestTemplate; import org.springframework.web.reactive.DispatcherHandler; import org.springframework.web.reactive.handler.SimpleHandlerResultHandler; -import org.springframework.web.server.WebToHttpHandlerAdapter; import org.springframework.web.server.WebToHttpHandlerBuilder; import static org.junit.Assert.assertEquals; @@ -113,7 +113,19 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati } @Test - public void rawHelloResponse() throws Exception { + public void rawFluxResponse() throws Exception { + + RestTemplate restTemplate = new RestTemplate(); + + URI url = new URI("http://localhost:" + port + "/raw-flux"); + RequestEntity request = RequestEntity.get(url).build(); + ResponseEntity response = restTemplate.exchange(request, String.class); + + assertEquals("Hello!", response.getBody()); + } + + @Test + public void rawObservableResponse() throws Exception { RestTemplate restTemplate = new RestTemplate(); @@ -158,6 +170,11 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati serializeAsPojo("http://localhost:" + port + "/completable-future"); } + @Test + public void serializeAsMono() throws Exception { + serializeAsPojo("http://localhost:" + port + "/mono"); + } + @Test public void serializeAsSingle() throws Exception { serializeAsPojo("http://localhost:" + port + "/single"); @@ -178,6 +195,11 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati serializeAsCollection("http://localhost:" + port + "/publisher"); } + @Test + public void serializeAsFlux() throws Exception { + serializeAsCollection("http://localhost:" + port + "/flux"); + } + @Test public void serializeAsObservable() throws Exception { serializeAsCollection("http://localhost:" + port + "/observable"); @@ -193,6 +215,11 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati capitalizeCollection("http://localhost:" + port + "/publisher-capitalize"); } + @Test + public void fluxCapitalize() throws Exception { + capitalizeCollection("http://localhost:" + port + "/flux-capitalize"); + } + @Test public void observableCapitalize() throws Exception { capitalizeCollection("http://localhost:" + port + "/observable-capitalize"); @@ -213,6 +240,11 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati capitalizePojo("http://localhost:" + port + "/completable-future-capitalize"); } + @Test + public void monoCapitalize() throws Exception { + capitalizePojo("http://localhost:" + port + "/mono-capitalize"); + } + @Test public void singleCapitalize() throws Exception { capitalizePojo("http://localhost:" + port + "/single-capitalize"); @@ -228,6 +260,11 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati create("http://localhost:" + this.port + "/publisher-create"); } + @Test + public void fluxCreate() throws Exception { + create("http://localhost:" + this.port + "/flux-create"); + } + @Test public void streamCreate() throws Exception { create("http://localhost:" + this.port + "/stream-create"); @@ -384,12 +421,24 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati ResolvableType.forClass(Person.class), MediaType.APPLICATION_JSON); } + @RequestMapping("/raw-flux") + @ResponseBody + public Flux rawFluxResponseBody() { + return Flux.just(Buffer.wrap("Hello!").byteBuffer()); + } + @RequestMapping("/raw-observable") @ResponseBody public Observable rawObservableResponseBody() { return Observable.just(Buffer.wrap("Hello!").byteBuffer()); } + @RequestMapping("/mono") + @ResponseBody + public Mono monoResponseBody() { + return Mono.just(new Person("Robert")); + } + @RequestMapping("/single") @ResponseBody public Single singleResponseBody() { @@ -414,6 +463,12 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati return Stream.just(new Person("Robert"), new Person("Marie")); } + @RequestMapping("/flux") + @ResponseBody + public Flux fluxResponseBody() { + return Flux.just(new Person("Robert"), new Person("Marie")); + } + @RequestMapping("/observable") @ResponseBody public Observable observableResponseBody() { @@ -435,6 +490,15 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati }); } + @RequestMapping("/flux-capitalize") + @ResponseBody + public Flux fluxCapitalize(@RequestBody Flux persons) { + return persons.map(person -> { + person.setName(person.getName().toUpperCase()); + return person; + }); + } + @RequestMapping("/observable-capitalize") @ResponseBody public Observable observableCapitalize(@RequestBody Observable persons) { @@ -471,6 +535,15 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati }); } + @RequestMapping("/mono-capitalize") + @ResponseBody + public Mono monoCapitalize(@RequestBody Mono personFuture) { + return personFuture.map(person -> { + person.setName(person.getName().toUpperCase()); + return person; + }); + } + @RequestMapping("/single-capitalize") @ResponseBody public Single singleCapitalize(@RequestBody Single personFuture) { @@ -491,12 +564,17 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati @RequestMapping("/publisher-create") public Publisher publisherCreate(@RequestBody Publisher personStream) { - return Stream.from(personStream).toList().doOnSuccess(persons::addAll).after(); + return Flux.from(personStream).doOnNext(persons::add).after(); + } + + @RequestMapping("/flux-create") + public Mono fluxCreate(@RequestBody Flux personStream) { + return personStream.doOnNext(persons::add).after(); } @RequestMapping("/stream-create") public Publisher streamCreate(@RequestBody Stream personStream) { - return Stream.from(personStream.toList().doOnSuccess(persons::addAll).after()).promise(); + return personStream.toList().doOnSuccess(persons::addAll).after(); } @RequestMapping("/observable-create") @@ -519,7 +597,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati @ExceptionHandler @ResponseBody public Publisher handleException(IllegalStateException ex) { - return Stream.just("Recovered from error: " + ex.getMessage()); + return Mono.just("Recovered from error: " + ex.getMessage()); } //TODO add mixed and T request mappings tests @@ -562,6 +640,13 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati public int hashCode() { return this.name != null ? this.name.hashCode() : 0; } + + @Override + public String toString() { + return "Person{" + + "name='" + name + '\'' + + '}'; + } } }