Add bodyToMono/bodyToFlux convenience methods
This commit adds a bodyToMono and bodyToFlux convenience method to ClientResponse/ServerRequest, similar to the body(Publisher) method that is on ClientRequest/ServerResponse.
This commit is contained in:
parent
2a279b7064
commit
fa9cc1eb1a
|
@ -27,13 +27,18 @@ import java.util.OptionalLong;
|
|||
import java.util.function.Supplier;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.HttpRange;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.codec.BodyExtractor;
|
||||
import org.springframework.http.codec.BodyExtractors;
|
||||
import org.springframework.http.codec.HttpMessageReader;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequest;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
|
||||
/**
|
||||
|
@ -73,6 +78,7 @@ class DefaultServerRequest implements ServerRequest {
|
|||
|
||||
@Override
|
||||
public <T> T body(BodyExtractor<T, ? super ServerHttpRequest> extractor) {
|
||||
Assert.notNull(extractor, "'extractor' must not be null");
|
||||
return extractor.extract(request(),
|
||||
new BodyExtractor.Context() {
|
||||
@Override
|
||||
|
@ -82,6 +88,16 @@ class DefaultServerRequest implements ServerRequest {
|
|||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Mono<T> bodyToMono(Class<? extends T> elementClass) {
|
||||
return body(BodyExtractors.toMono(elementClass));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Flux<T> bodyToFlux(Class<? extends T> elementClass) {
|
||||
return body(BodyExtractors.toFlux(elementClass));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Optional<T> attribute(String name) {
|
||||
return this.exchange.getAttribute(name);
|
||||
|
|
|
@ -35,7 +35,6 @@ import reactor.core.publisher.Flux;
|
|||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.core.Conventions;
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.http.CacheControl;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
|
@ -168,11 +167,6 @@ class DefaultServerResponseBuilder implements ServerResponse.BodyBuilder {
|
|||
return body(BodyInserters.fromPublisher(publisher, elementClass));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <S extends Publisher<T>, T> ServerResponse<S> body(S publisher, ResolvableType elementType) {
|
||||
return body(BodyInserters.fromPublisher(publisher, elementType));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerResponse<Rendering> render(String name, Object... modelAttributes) {
|
||||
Assert.hasLength(name, "'name' must not be empty");
|
||||
|
|
|
@ -25,6 +25,9 @@ import java.util.Optional;
|
|||
import java.util.Set;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.codec.BodyExtractor;
|
||||
|
@ -320,6 +323,16 @@ public abstract class RequestPredicates {
|
|||
return this.request.body(extractor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Mono<T> bodyToMono(Class<? extends T> elementClass) {
|
||||
return this.request.bodyToMono(elementClass);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Flux<T> bodyToFlux(Class<? extends T> elementClass) {
|
||||
return this.request.bodyToFlux(elementClass);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Optional<T> attribute(String name) {
|
||||
return this.request.attribute(name);
|
||||
|
|
|
@ -24,6 +24,9 @@ import java.util.Map;
|
|||
import java.util.Optional;
|
||||
import java.util.OptionalLong;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.HttpRange;
|
||||
|
@ -71,6 +74,22 @@ public interface ServerRequest {
|
|||
*/
|
||||
<T> T body(BodyExtractor<T, ? super ServerHttpRequest> extractor);
|
||||
|
||||
/**
|
||||
* Extract the body to a {@code Mono}.
|
||||
* @param elementClass the class of element in the {@code Mono}
|
||||
* @param <T> the element type
|
||||
* @return the body as a mono
|
||||
*/
|
||||
<T> Mono<T> bodyToMono(Class<? extends T> elementClass);
|
||||
|
||||
/**
|
||||
* Extract the body to a {@code Flux}.
|
||||
* @param elementClass the class of element in the {@code Flux}
|
||||
* @param <T> the element type
|
||||
* @return the body as a flux
|
||||
*/
|
||||
<T> Flux<T> bodyToFlux(Class<? extends T> elementClass);
|
||||
|
||||
/**
|
||||
* Return the request attribute value if present.
|
||||
* @param name the attribute name
|
||||
|
|
|
@ -25,7 +25,6 @@ import java.util.Set;
|
|||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.http.CacheControl;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
|
@ -329,18 +328,6 @@ public interface ServerResponse<T> {
|
|||
*/
|
||||
<S extends Publisher<T>, T> ServerResponse<S> body(S publisher, Class<T> elementClass);
|
||||
|
||||
/**
|
||||
* Set the body of the response to the given {@code Publisher} and return it. This
|
||||
* convenience method combines {@link #body(BodyInserter)} and
|
||||
* {@link BodyInserters#fromPublisher(Publisher, ResolvableType)}.
|
||||
* @param publisher the {@code Publisher} to write to the response
|
||||
* @param elementType the type of elements contained in the publisher
|
||||
* @param <T> the type of the elements contained in the publisher
|
||||
* @param <S> the type of the {@code Publisher}
|
||||
* @return the built request
|
||||
*/
|
||||
<S extends Publisher<T>, T> ServerResponse<S> body(S publisher, ResolvableType elementType);
|
||||
|
||||
/**
|
||||
* Render the template with the given {@code name} using the given {@code modelAttributes}.
|
||||
* The model attributes are mapped under a
|
||||
|
|
|
@ -24,6 +24,9 @@ import java.util.Map;
|
|||
import java.util.Optional;
|
||||
import java.util.OptionalLong;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.HttpRange;
|
||||
|
@ -88,6 +91,16 @@ public class ServerRequestWrapper implements ServerRequest {
|
|||
return this.request.body(extractor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Mono<T> bodyToMono(Class<? extends T> elementClass) {
|
||||
return this.request.bodyToMono(elementClass);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Flux<T> bodyToFlux(Class<? extends T> elementClass) {
|
||||
return this.request.bodyToFlux(elementClass);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Optional<T> attribute(String name) {
|
||||
return this.request.attribute(name);
|
||||
|
|
|
@ -185,4 +185,45 @@ public class DefaultServerRequestTests {
|
|||
assertEquals("foo", resultMono.block());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void bodyToMono() throws Exception {
|
||||
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
|
||||
DefaultDataBuffer dataBuffer =
|
||||
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
|
||||
Flux<DataBuffer> body = Flux.just(dataBuffer);
|
||||
|
||||
HttpHeaders httpHeaders = new HttpHeaders();
|
||||
httpHeaders.setContentType(MediaType.TEXT_PLAIN);
|
||||
when(mockRequest.getHeaders()).thenReturn(httpHeaders);
|
||||
when(mockRequest.getBody()).thenReturn(body);
|
||||
|
||||
Set<HttpMessageReader<?>> messageReaders = Collections
|
||||
.singleton(new DecoderHttpMessageReader<String>(new StringDecoder()));
|
||||
when(mockHandlerStrategies.messageReaders()).thenReturn(messageReaders::stream);
|
||||
|
||||
Mono<String> resultMono = defaultRequest.bodyToMono(String.class);
|
||||
assertEquals("foo", resultMono.block());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void bodyToFlux() throws Exception {
|
||||
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
|
||||
DefaultDataBuffer dataBuffer =
|
||||
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
|
||||
Flux<DataBuffer> body = Flux.just(dataBuffer);
|
||||
|
||||
HttpHeaders httpHeaders = new HttpHeaders();
|
||||
httpHeaders.setContentType(MediaType.TEXT_PLAIN);
|
||||
when(mockRequest.getHeaders()).thenReturn(httpHeaders);
|
||||
when(mockRequest.getBody()).thenReturn(body);
|
||||
|
||||
Set<HttpMessageReader<?>> messageReaders = Collections
|
||||
.singleton(new DecoderHttpMessageReader<String>(new StringDecoder()));
|
||||
when(mockHandlerStrategies.messageReaders()).thenReturn(messageReaders::stream);
|
||||
|
||||
Flux<String> resultFlux = defaultRequest.bodyToFlux(String.class);
|
||||
Mono<List<String>> result = resultFlux.collectList();
|
||||
assertEquals(Collections.singletonList("foo"), result.block());
|
||||
}
|
||||
|
||||
}
|
|
@ -29,6 +29,9 @@ import java.util.Map;
|
|||
import java.util.Optional;
|
||||
import java.util.OptionalLong;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.HttpRange;
|
||||
|
@ -96,6 +99,18 @@ public class MockServerRequest<T> implements ServerRequest {
|
|||
return (S) this.body;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public <S> Mono<S> bodyToMono(Class<? extends S> elementClass) {
|
||||
return (Mono<S>) this.body;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public <S> Flux<S> bodyToFlux(Class<? extends S> elementClass) {
|
||||
return (Flux<S>) this.body;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <S> Optional<S> attribute(String name) {
|
||||
|
|
|
@ -23,7 +23,6 @@ import java.time.ZonedDateTime;
|
|||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.MediaType;
|
||||
|
@ -318,16 +317,6 @@ public interface ClientRequest<T> {
|
|||
*/
|
||||
<T, S extends Publisher<T>> ClientRequest<S> body(S publisher, Class<T> elementClass);
|
||||
|
||||
/**
|
||||
* Set the body of the request to the given {@code Publisher} and return it.
|
||||
* @param publisher the {@code Publisher} to write to the request
|
||||
* @param elementType the type of elements contained in the publisher
|
||||
* @param <T> the type of the elements contained in the publisher
|
||||
* @param <S> the type of the {@code Publisher}.
|
||||
* @return the built request
|
||||
*/
|
||||
<T, S extends Publisher<T>> ClientRequest<S> body(S publisher, ResolvableType elementType);
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -20,6 +20,9 @@ import java.util.List;
|
|||
import java.util.Optional;
|
||||
import java.util.OptionalLong;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
|
@ -55,6 +58,23 @@ public interface ClientResponse {
|
|||
*/
|
||||
<T> T body(BodyExtractor<T, ? super ClientHttpResponse> extractor);
|
||||
|
||||
/**
|
||||
* Extract the body to a {@code Mono}.
|
||||
* @param elementClass the class of element in the {@code Mono}
|
||||
* @param <T> the element type
|
||||
* @return the body as a mono
|
||||
*/
|
||||
<T> Mono<T> bodyToMono(Class<? extends T> elementClass);
|
||||
|
||||
/**
|
||||
* Extract the body to a {@code Flux}.
|
||||
* @param elementClass the class of element in the {@code Flux}
|
||||
* @param <T> the element type
|
||||
* @return the body as a flux
|
||||
*/
|
||||
<T> Flux<T> bodyToFlux(Class<? extends T> elementClass);
|
||||
|
||||
|
||||
/**
|
||||
* Represents the headers of the HTTP response.
|
||||
* @see ClientResponse#headers()
|
||||
|
|
|
@ -28,7 +28,6 @@ import java.util.stream.Stream;
|
|||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.http.HttpCookie;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
|
@ -151,12 +150,6 @@ class DefaultClientRequestBuilder implements ClientRequest.BodyBuilder {
|
|||
return body(BodyInserters.fromPublisher(publisher, elementClass));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T, S extends Publisher<T>> ClientRequest<S> body(S publisher,
|
||||
ResolvableType elementType) {
|
||||
return body(BodyInserters.fromPublisher(publisher, elementType));
|
||||
}
|
||||
|
||||
private static class BodyInserterRequest<T> implements ClientRequest<T> {
|
||||
|
||||
private final HttpMethod method;
|
||||
|
|
|
@ -23,11 +23,15 @@ import java.util.OptionalLong;
|
|||
import java.util.function.Supplier;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.client.reactive.ClientHttpResponse;
|
||||
import org.springframework.http.codec.BodyExtractor;
|
||||
import org.springframework.http.codec.BodyExtractors;
|
||||
import org.springframework.http.codec.HttpMessageReader;
|
||||
|
||||
/**
|
||||
|
@ -71,8 +75,14 @@ class DefaultClientResponse implements ClientResponse {
|
|||
});
|
||||
}
|
||||
|
||||
public ClientHttpResponse clientHttpResponse() {
|
||||
return this.response;
|
||||
@Override
|
||||
public <T> Mono<T> bodyToMono(Class<? extends T> elementClass) {
|
||||
return body(BodyExtractors.toMono(elementClass));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Flux<T> bodyToFlux(Class<? extends T> elementClass) {
|
||||
return body(BodyExtractors.toFlux(elementClass));
|
||||
}
|
||||
|
||||
private class DefaultHeaders implements Headers {
|
||||
|
|
|
@ -115,5 +115,46 @@ public class DefaultClientResponseTests {
|
|||
assertEquals("foo", resultMono.block());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void bodyToMono() throws Exception {
|
||||
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
|
||||
DefaultDataBuffer dataBuffer =
|
||||
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
|
||||
Flux<DataBuffer> body = Flux.just(dataBuffer);
|
||||
|
||||
HttpHeaders httpHeaders = new HttpHeaders();
|
||||
httpHeaders.setContentType(MediaType.TEXT_PLAIN);
|
||||
when(mockResponse.getHeaders()).thenReturn(httpHeaders);
|
||||
when(mockResponse.getBody()).thenReturn(body);
|
||||
|
||||
Set<HttpMessageReader<?>> messageReaders = Collections
|
||||
.singleton(new DecoderHttpMessageReader<String>(new StringDecoder()));
|
||||
when(mockWebClientStrategies.messageReaders()).thenReturn(messageReaders::stream);
|
||||
|
||||
Mono<String> resultMono = defaultClientResponse.bodyToMono(String.class);
|
||||
assertEquals("foo", resultMono.block());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void bodyToFlux() throws Exception {
|
||||
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
|
||||
DefaultDataBuffer dataBuffer =
|
||||
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
|
||||
Flux<DataBuffer> body = Flux.just(dataBuffer);
|
||||
|
||||
HttpHeaders httpHeaders = new HttpHeaders();
|
||||
httpHeaders.setContentType(MediaType.TEXT_PLAIN);
|
||||
when(mockResponse.getHeaders()).thenReturn(httpHeaders);
|
||||
when(mockResponse.getBody()).thenReturn(body);
|
||||
|
||||
Set<HttpMessageReader<?>> messageReaders = Collections
|
||||
.singleton(new DecoderHttpMessageReader<String>(new StringDecoder()));
|
||||
when(mockWebClientStrategies.messageReaders()).thenReturn(messageReaders::stream);
|
||||
|
||||
Flux<String> resultFlux = defaultClientResponse.bodyToFlux(String.class);
|
||||
Mono<List<String>> result = resultFlux.collectList();
|
||||
assertEquals(Collections.singletonList("foo"), result.block());
|
||||
}
|
||||
|
||||
|
||||
}
|
Loading…
Reference in New Issue