diff --git a/spring-test/src/main/java/org/springframework/mock/web/reactive/function/server/MockServerRequest.java b/spring-test/src/main/java/org/springframework/mock/web/reactive/function/server/MockServerRequest.java index e871be13c0d..4f4ea161a62 100644 --- a/spring-test/src/main/java/org/springframework/mock/web/reactive/function/server/MockServerRequest.java +++ b/spring-test/src/main/java/org/springframework/mock/web/reactive/function/server/MockServerRequest.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpCookie; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -149,6 +150,13 @@ public class MockServerRequest implements ServerRequest { return (Mono) this.body; } + @Override + @SuppressWarnings("unchecked") + public Mono bodyToMono(ParameterizedTypeReference typeReference) { + Assert.state(this.body != null, "No body"); + return (Mono) this.body; + } + @Override @SuppressWarnings("unchecked") public Flux bodyToFlux(Class elementClass) { @@ -156,6 +164,13 @@ public class MockServerRequest implements ServerRequest { return (Flux) this.body; } + @Override + @SuppressWarnings("unchecked") + public Flux bodyToFlux(ParameterizedTypeReference typeReference) { + Assert.state(this.body != null, "No body"); + return (Flux) this.body; + } + @Override public Map attributes() { return this.attributes; diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientRequest.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientRequest.java index 5240d033e79..59a94ddca34 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientRequest.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientRequest.java @@ -24,6 +24,7 @@ import java.util.function.Consumer; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.client.reactive.ClientHttpRequest; @@ -189,6 +190,17 @@ public interface ClientRequest { */ > Builder body(P publisher, Class elementClass); + /** + * Set the body of the request to the given {@code Publisher} and return it. + * @param publisher the {@code Publisher} to write to the request + * @param typeReference a type reference describing the elements contained in the publisher + * @param the type of the elements contained in the publisher + * @param

the type of the {@code Publisher} + * @return the built request + */ + > Builder body(P publisher, + ParameterizedTypeReference typeReference); + /** * Set the attribute with the given name to the given value. * @param name the name of the attribute to add diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientRequestBuilder.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientRequestBuilder.java index a50c4d4c4ae..a13205d0b65 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientRequestBuilder.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientRequestBuilder.java @@ -27,6 +27,7 @@ import java.util.function.Consumer; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpCookie; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -106,6 +107,17 @@ class DefaultClientRequestBuilder implements ClientRequest.Builder { return this; } + @Override + public > ClientRequest.Builder body(P publisher, + ParameterizedTypeReference typeReference) { + + Assert.notNull(publisher, "'publisher' must not be null"); + Assert.notNull(typeReference, "'typeReference' must not be null"); + + this.inserter = BodyInserters.fromPublisher(publisher, typeReference); + return this; + } + @Override public ClientRequest.Builder attribute(String name, Object value) { this.attributes.put(name, value); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultServerRequest.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultServerRequest.java index a395e638a4b..39e43829713 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultServerRequest.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultServerRequest.java @@ -32,6 +32,7 @@ import java.util.function.Function; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpCookie; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -139,12 +140,24 @@ class DefaultServerRequest implements ServerRequest { return mono.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER); } + @Override + public Mono bodyToMono(ParameterizedTypeReference typeReference) { + Mono mono = body(BodyExtractors.toMono(typeReference)); + return mono.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER); + } + @Override public Flux bodyToFlux(Class elementClass) { Flux flux = body(BodyExtractors.toFlux(elementClass)); return flux.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER); } + @Override + public Flux bodyToFlux(ParameterizedTypeReference typeReference) { + Flux flux = body(BodyExtractors.toFlux(typeReference)); + return flux.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER); + } + @Override public Map attributes() { return this.exchange.getAttributes(); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultServerResponseBuilder.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultServerResponseBuilder.java index ee19094188d..0f3d1dc3a0f 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultServerResponseBuilder.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultServerResponseBuilder.java @@ -17,9 +17,7 @@ package org.springframework.web.reactive.function.server; import java.net.URI; -import java.time.ZoneId; import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashSet; @@ -33,6 +31,7 @@ import java.util.function.Consumer; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.CacheControl; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -183,6 +182,21 @@ class DefaultServerResponseBuilder implements ServerResponse.BodyBuilder { .map(entityResponse -> entityResponse); } + @Override + public > Mono body(P publisher, + ParameterizedTypeReference typeReference) { + + Assert.notNull(publisher, "'publisher' must not be null"); + Assert.notNull(typeReference, "'typeReference' must not be null"); + + return new DefaultEntityResponseBuilder<>(publisher, + BodyInserters.fromPublisher(publisher, typeReference)) + .headers(this.headers) + .status(this.statusCode) + .build() + .map(entityResponse -> entityResponse); + } + @Override public Mono syncBody(Object body) { Assert.notNull(body, "'body' must not be null"); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/RequestPredicates.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/RequestPredicates.java index 8e880060323..0f90bfd704e 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/RequestPredicates.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/RequestPredicates.java @@ -35,6 +35,7 @@ import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpCookie; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; @@ -515,11 +516,21 @@ public abstract class RequestPredicates { return this.request.bodyToMono(elementClass); } + @Override + public Mono bodyToMono(ParameterizedTypeReference typeReference) { + return this.request.bodyToMono(typeReference); + } + @Override public Flux bodyToFlux(Class elementClass) { return this.request.bodyToFlux(elementClass); } + @Override + public Flux bodyToFlux(ParameterizedTypeReference typeReference) { + return this.request.bodyToFlux(typeReference); + } + @Override public Optional attribute(String name) { return this.request.attribute(name); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/ServerRequest.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/ServerRequest.java index 463a7224358..6ebf0abb5a0 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/ServerRequest.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/ServerRequest.java @@ -29,6 +29,7 @@ import java.util.OptionalLong; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpCookie; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -118,6 +119,14 @@ public interface ServerRequest { */ Mono bodyToMono(Class elementClass); + /** + * Extract the body to a {@code Mono}. + * @param typeReference a type reference describing the expected response request type + * @param the element type + * @return a mono containing the body of the given type {@code T} + */ + Mono bodyToMono(ParameterizedTypeReference typeReference); + /** * Extract the body to a {@code Flux}. * @param elementClass the class of element in the {@code Flux} @@ -126,6 +135,14 @@ public interface ServerRequest { */ Flux bodyToFlux(Class elementClass); + /** + * Extract the body to a {@code Flux}. + * @param typeReference a type reference describing the expected request body type + * @param the element type + * @return a flux containing the body of the given type {@code T} + */ + Flux bodyToFlux(ParameterizedTypeReference typeReference); + /** * Return the request attribute value if present. * @param name the attribute name diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/ServerResponse.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/ServerResponse.java index a340d2c57a2..22b2dbb27c4 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/ServerResponse.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/ServerResponse.java @@ -28,6 +28,7 @@ import java.util.function.Consumer; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.CacheControl; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -352,6 +353,19 @@ public interface ServerResponse { */ > Mono body(P publisher, Class elementClass); + /** + * Set the body of the response to the given asynchronous {@code Publisher} and return it. + * This convenience method combines {@link #body(BodyInserter)} and + * {@link BodyInserters#fromPublisher(Publisher, Class)}. + * @param publisher the {@code Publisher} to write to the response + * @param typeReference a type reference describing the elements contained in the publisher + * @param the type of the elements contained in the publisher + * @param

the type of the {@code Publisher} + * @return the built response + */ + > Mono body(P publisher, + ParameterizedTypeReference typeReference); + /** * Set the body of the response to the given synchronous {@code Object} and return it. * This convenience method combines {@link #body(BodyInserter)} and diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/support/ServerRequestWrapper.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/support/ServerRequestWrapper.java index 8510bb90b0c..e612df531ec 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/support/ServerRequestWrapper.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/support/ServerRequestWrapper.java @@ -29,6 +29,7 @@ import java.util.OptionalLong; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpCookie; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -118,11 +119,21 @@ public class ServerRequestWrapper implements ServerRequest { return this.delegate.bodyToMono(elementClass); } + @Override + public Mono bodyToMono(ParameterizedTypeReference typeReference) { + return this.delegate.bodyToMono(typeReference); + } + @Override public Flux bodyToFlux(Class elementClass) { return this.delegate.bodyToFlux(elementClass); } + @Override + public Flux bodyToFlux(ParameterizedTypeReference typeReference) { + return this.delegate.bodyToFlux(typeReference); + } + @Override public Optional attribute(String name) { return this.delegate.attribute(name); diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultClientRequestBuilderTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultClientRequestBuilderTests.java index 88e57163e76..6901adbc08d 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultClientRequestBuilderTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultClientRequestBuilderTests.java @@ -17,14 +17,15 @@ package org.springframework.web.reactive.function.client; import java.net.URI; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import org.junit.Test; +import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.codec.CharSequenceEncoder; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DefaultDataBufferFactory; @@ -101,8 +102,7 @@ public class DefaultClientRequestBuilderTests { BodyInserter inserter = (response, strategies) -> { byte[] bodyBytes = body.getBytes(UTF_8); - ByteBuffer byteBuffer = ByteBuffer.wrap(bodyBytes); - DataBuffer buffer = new DefaultDataBufferFactory().wrap(byteBuffer); + DataBuffer buffer = new DefaultDataBufferFactory().wrap(bodyBytes); return response.writeWith(Mono.just(buffer)); }; @@ -119,6 +119,55 @@ public class DefaultClientRequestBuilderTests { MockClientHttpRequest request = new MockClientHttpRequest(GET, "/"); result.writeTo(request, strategies).block(); assertNotNull(request.getBody()); + + StepVerifier.create(request.getBody()) + .expectNextCount(1) + .verifyComplete(); + } + + @Test + public void bodyClass() throws Exception { + String body = "foo"; + Publisher publisher = Mono.just(body); + ClientRequest result = ClientRequest.method(POST, URI.create("http://example.com")) + .body(publisher, String.class).build(); + + List> messageWriters = new ArrayList<>(); + messageWriters.add(new EncoderHttpMessageWriter<>(CharSequenceEncoder.allMimeTypes())); + + ExchangeStrategies strategies = mock(ExchangeStrategies.class); + when(strategies.messageWriters()).thenReturn(messageWriters); + + MockClientHttpRequest request = new MockClientHttpRequest(GET, "/"); + result.writeTo(request, strategies).block(); + assertNotNull(request.getBody()); + + StepVerifier.create(request.getBody()) + .expectNextCount(1) + .verifyComplete(); + } + + @Test + public void bodyParameterizedTypeReference() throws Exception { + String body = "foo"; + Publisher publisher = Mono.just(body); + ParameterizedTypeReference typeReference = new ParameterizedTypeReference() {}; + ClientRequest result = ClientRequest.method(POST, URI.create("http://example.com")) + .body(publisher, typeReference).build(); + + List> messageWriters = new ArrayList<>(); + messageWriters.add(new EncoderHttpMessageWriter<>(CharSequenceEncoder.allMimeTypes())); + + ExchangeStrategies strategies = mock(ExchangeStrategies.class); + when(strategies.messageWriters()).thenReturn(messageWriters); + + MockClientHttpRequest request = new MockClientHttpRequest(GET, "/"); + result.writeTo(request, strategies).block(); + assertNotNull(request.getBody()); + + StepVerifier.create(request.getBody()) + .expectNextCount(1) + .verifyComplete(); } } \ No newline at end of file diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/DefaultServerRequestTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/DefaultServerRequestTests.java index 2461702d9a9..ab95851c074 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/DefaultServerRequestTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/DefaultServerRequestTests.java @@ -33,6 +33,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.codec.StringDecoder; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DefaultDataBuffer; @@ -50,7 +51,7 @@ import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import org.springframework.web.server.UnsupportedMediaTypeStatusException; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; import static org.springframework.web.reactive.function.BodyExtractors.toMono; /** @@ -238,6 +239,24 @@ public class DefaultServerRequestTests { assertEquals("foo", resultMono.block()); } + @Test + public void bodyToMonoParameterizedTypeReference() throws Exception { + DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); + DefaultDataBuffer dataBuffer = + factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8))); + Flux body = Flux.just(dataBuffer); + + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.setContentType(MediaType.TEXT_PLAIN); + MockServerHttpRequest mockRequest = MockServerHttpRequest.method(HttpMethod.GET, "http://example.com?foo=bar"). + headers(httpHeaders).body(body); + DefaultServerRequest request = new DefaultServerRequest(mockRequest.toExchange(), messageReaders); + + ParameterizedTypeReference typeReference = new ParameterizedTypeReference() {}; + Mono resultMono = request.bodyToMono(typeReference); + assertEquals("foo", resultMono.block()); + } + @Test public void bodyToFlux() throws Exception { DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); @@ -255,6 +274,24 @@ public class DefaultServerRequestTests { assertEquals(Collections.singletonList("foo"), resultFlux.collectList().block()); } + @Test + public void bodyToFluxParameterizedTypeReference() throws Exception { + DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); + DefaultDataBuffer dataBuffer = + factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8))); + Flux body = Flux.just(dataBuffer); + + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.setContentType(MediaType.TEXT_PLAIN); + MockServerHttpRequest mockRequest = MockServerHttpRequest.method(HttpMethod.GET, "http://example.com?foo=bar"). + headers(httpHeaders).body(body); + DefaultServerRequest request = new DefaultServerRequest(mockRequest.toExchange(), messageReaders); + + ParameterizedTypeReference typeReference = new ParameterizedTypeReference() {}; + Flux resultFlux = request.bodyToFlux(typeReference); + assertEquals(Collections.singletonList("foo"), resultFlux.collectList().block()); + } + @Test public void bodyUnacceptable() throws Exception { DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/MockServerRequest.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/MockServerRequest.java index f49dc02839b..721ad165c39 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/MockServerRequest.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/MockServerRequest.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpCookie; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -148,6 +149,13 @@ public class MockServerRequest implements ServerRequest { return (Mono) this.body; } + @Override + @SuppressWarnings("unchecked") + public Mono bodyToMono(ParameterizedTypeReference typeReference) { + Assert.state(this.body != null, "No body"); + return (Mono) this.body; + } + @Override @SuppressWarnings("unchecked") public Flux bodyToFlux(Class elementClass) { @@ -155,6 +163,13 @@ public class MockServerRequest implements ServerRequest { return (Flux) this.body; } + @Override + @SuppressWarnings("unchecked") + public Flux bodyToFlux(ParameterizedTypeReference typeReference) { + Assert.state(this.body != null, "No body"); + return (Flux) this.body; + } + @Override public Map attributes() { return this.attributes;