Improve readability of Body[Inserters|Extractors]

This commit is contained in:
Rossen Stoyanchev 2018-05-25 13:30:57 -04:00
parent 9fb2fd66c0
commit d77797f42c
2 changed files with 222 additions and 275 deletions

View File

@ -29,256 +29,226 @@ import reactor.core.publisher.Mono;
import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ResolvableType; import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpMessage;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpInputMessage; import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.http.codec.HttpMessageReader; import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.multipart.Part; import org.springframework.http.codec.multipart.Part;
import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.util.MultiValueMap; import org.springframework.util.MultiValueMap;
/** /**
* Implementations of {@link BodyExtractor} that read various bodies, such a reactive streams. * Static factory methods for {@link BodyExtractor} implementations.
* *
* @author Arjen Poutsma * @author Arjen Poutsma
* @author Sebastien Deleuze * @author Sebastien Deleuze
* @author Rossen Stoyanchev
* @since 5.0 * @since 5.0
*/ */
public abstract class BodyExtractors { public abstract class BodyExtractors {
private static final ResolvableType FORM_MAP_TYPE = private static final ResolvableType FORM_DATA_TYPE =
ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, String.class); ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, String.class);
private static final ResolvableType MULTIPART_MAP_TYPE = ResolvableType.forClassWithGenerics( private static final ResolvableType MULTIPART_DATA_TYPE = ResolvableType.forClassWithGenerics(
MultiValueMap.class, String.class, Part.class); MultiValueMap.class, String.class, Part.class);
private static final ResolvableType PART_TYPE = ResolvableType.forClass(Part.class); private static final ResolvableType PART_TYPE = ResolvableType.forClass(Part.class);
private static final ResolvableType VOID_TYPE = ResolvableType.forClass(Void.class); private static final ResolvableType VOID_TYPE = ResolvableType.forClass(Void.class);
/** /**
* Return a {@code BodyExtractor} that reads into a Reactor {@link Mono}. * Extractor to decode the input content into {@code Mono<T>}.
* @param elementClass the class of element in the {@code Mono} * @param elementClass the class of the element type to decode to
* @param <T> the element type * @param <T> the element type to decode to
* @return a {@code BodyExtractor} that reads a mono * @return {@code BodyExtractor} for {@code Mono<T>}
*/ */
public static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(Class<? extends T> elementClass) { public static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(Class<? extends T> elementClass) {
return toMono(ResolvableType.forClass(elementClass)); return toMono(ResolvableType.forClass(elementClass));
} }
/** /**
* Return a {@code BodyExtractor} that reads into a Reactor {@link Mono}. * Variant of {@link #toMono(Class)} for type information with generics.
* The given {@link ParameterizedTypeReference} is used to pass generic type information, for * @param typeRef the type reference for the type to decode to
* instance when using the {@link org.springframework.web.reactive.function.client.WebClient WebClient} * @param <T> the element type to decode to
* <pre class="code"> * @return {@code BodyExtractor} for {@code Mono<T>}
* Mono&lt;Map&lt;String, String&gt;&gt; body = this.webClient
* .get()
* .uri("http://example.com")
* .exchange()
* .flatMap(r -> r.body(toMono(new ParameterizedTypeReference&lt;Map&lt;String,String&gt;&gt;() {})));
* </pre>
* @param typeReference a reference to the type of element in the {@code Mono}
* @param <T> the element type
* @return a {@code BodyExtractor} that reads a mono
*/ */
public static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono( public static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(ParameterizedTypeReference<T> typeRef) {
ParameterizedTypeReference<T> typeReference) { return toMono(ResolvableType.forType(typeRef.getType()));
return toMono(ResolvableType.forType(typeReference.getType()));
} }
static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(ResolvableType elementType) { private static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(ResolvableType elementType) {
return (inputMessage, context) -> readWithMessageReaders(inputMessage, context, return (inputMessage, context) ->
elementType, readWithMessageReaders(inputMessage, context, elementType,
(HttpMessageReader<T> reader) -> { (HttpMessageReader<T> reader) -> readToMono(inputMessage, context, elementType, reader),
Optional<ServerHttpResponse> serverResponse = context.serverResponse(); ex -> Mono.from(unsupportedErrorHandler(inputMessage, ex)),
if (serverResponse.isPresent() && inputMessage instanceof ServerHttpRequest) {
return reader.readMono(elementType, elementType, (ServerHttpRequest) inputMessage,
serverResponse.get(), context.hints());
}
else {
return reader.readMono(elementType, inputMessage, context.hints());
}
},
ex -> (inputMessage.getHeaders().getContentType() == null) ?
Mono.from(permitEmptyOrFail(inputMessage, ex)) : Mono.error(ex),
Mono::empty); Mono::empty);
} }
/** /**
* Return a {@code BodyExtractor} that reads into a Reactor {@link Flux}. * Extractor to decode the input content into {@code Flux<T>}.
* @param elementClass the class of element in the {@code Flux} * @param elementClass the class of the element type to decode to
* @param <T> the element type * @param <T> the element type to decode to
* @return a {@code BodyExtractor} that reads a flux * @return {@code BodyExtractor} for {@code Flux<T>}
*/ */
public static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(Class<? extends T> elementClass) { public static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(Class<? extends T> elementClass) {
return toFlux(ResolvableType.forClass(elementClass)); return toFlux(ResolvableType.forClass(elementClass));
} }
/** /**
* Return a {@code BodyExtractor} that reads into a Reactor {@link Flux}. * Variant of {@link #toFlux(Class)} for type information with generics.
* The given {@link ParameterizedTypeReference} is used to pass generic type information, for * @param typeRef the type reference for the type to decode to
* instance when using the {@link org.springframework.web.reactive.function.client.WebClient WebClient} * @param <T> the element type to decode to
* <pre class="code"> * @return {@code BodyExtractor} for {@code Flux<T>}
* Flux&lt;ServerSentEvent&lt;String&gt;&gt; body = this.webClient
* .get()
* .uri("http://example.com")
* .exchange()
* .flatMap(r -> r.body(toFlux(new ParameterizedTypeReference&lt;ServerSentEvent&lt;String&gt;&gt;() {})));
* </pre>
* @param typeReference a reference to the type of element in the {@code Flux}
* @param <T> the element type
* @return a {@code BodyExtractor} that reads a flux
*/ */
public static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux( public static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(ParameterizedTypeReference<T> typeRef) {
ParameterizedTypeReference<T> typeReference) { return toFlux(ResolvableType.forType(typeRef.getType()));
return toFlux(ResolvableType.forType(typeReference.getType()));
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(ResolvableType elementType) { private static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(ResolvableType elementType) {
return (inputMessage, context) -> readWithMessageReaders(inputMessage, context, return (inputMessage, context) ->
elementType, readWithMessageReaders(inputMessage, context, elementType,
(HttpMessageReader<T> reader) -> { (HttpMessageReader<T> reader) -> readToFlux(inputMessage, context, elementType, reader),
Optional<ServerHttpResponse> serverResponse = context.serverResponse(); ex -> unsupportedErrorHandler(inputMessage, ex),
if (serverResponse.isPresent() && inputMessage instanceof ServerHttpRequest) {
return reader.read(elementType, elementType, (ServerHttpRequest) inputMessage,
serverResponse.get(), context.hints());
}
else {
return reader.read(elementType, inputMessage, context.hints());
}
},
ex -> (inputMessage.getHeaders().getContentType() == null) ?
permitEmptyOrFail(inputMessage, ex) : Flux.error(ex),
Flux::empty); Flux::empty);
} }
@SuppressWarnings("unchecked")
private static <T> Flux<T> permitEmptyOrFail(ReactiveHttpInputMessage message, UnsupportedMediaTypeException ex) { // Extractors for specific content ..
return message.getBody().doOnNext(buffer -> {
throw ex;
}).map(o -> (T) o);
}
/** /**
* Return a {@code BodyExtractor} that reads form data into a {@link MultiValueMap}. * Extractor to read form data into {@code MultiValueMap<String, String>}.
* <p>As of 5.1 this method can also be used on the client side to read form * <p>As of 5.1 this method can also be used on the client side to read form
* data from a server response (e.g. OAuth). * data from a server response (e.g. OAuth).
* @return a {@code BodyExtractor} that reads form data * @return {@code BodyExtractor} for form data
*/ */
public static BodyExtractor<Mono<MultiValueMap<String, String>>, ReactiveHttpInputMessage> toFormData() { public static BodyExtractor<Mono<MultiValueMap<String, String>>, ReactiveHttpInputMessage> toFormData() {
return (message, context) -> { return (message, context) -> {
ResolvableType type = FORM_MAP_TYPE; ResolvableType elementType = FORM_DATA_TYPE;
HttpMessageReader<MultiValueMap<String, String>> reader = MediaType mediaType = MediaType.APPLICATION_FORM_URLENCODED;
messageReader(type, MediaType.APPLICATION_FORM_URLENCODED, context); HttpMessageReader<MultiValueMap<String, String>> reader = findReader(elementType, mediaType, context);
Optional<ServerHttpResponse> response = context.serverResponse(); return readToMono(message, context, elementType, reader);
if (response.isPresent() && message instanceof ServerHttpRequest) {
return reader.readMono(type, type, (ServerHttpRequest) message, response.get(), context.hints());
}
else {
return reader.readMono(type, message, context.hints());
}
}; };
} }
/** /**
* Return a {@code BodyExtractor} that reads multipart (i.e. file upload) form data into a * Extractor to read multipart data into a {@code MultiValueMap<String, Part>}.
* {@link MultiValueMap}. * @return {@code BodyExtractor} for multipart data
* @return a {@code BodyExtractor} that reads multipart data
*/ */
// Note that the returned BodyExtractor is parameterized to ServerHttpRequest, not // Parameterized for server-side use
// ReactiveHttpInputMessage like other methods, since reading form data only typically happens on
// the server-side
public static BodyExtractor<Mono<MultiValueMap<String, Part>>, ServerHttpRequest> toMultipartData() { public static BodyExtractor<Mono<MultiValueMap<String, Part>>, ServerHttpRequest> toMultipartData() {
return (serverRequest, context) -> { return (serverRequest, context) -> {
ResolvableType type = MULTIPART_MAP_TYPE; ResolvableType elementType = MULTIPART_DATA_TYPE;
HttpMessageReader<MultiValueMap<String, Part>> reader = MediaType mediaType = MediaType.MULTIPART_FORM_DATA;
messageReader(type, MediaType.MULTIPART_FORM_DATA, context); HttpMessageReader<MultiValueMap<String, Part>> reader = findReader(elementType, mediaType, context);
return context.serverResponse() return readToMono(serverRequest, context, elementType, reader);
.map(response -> reader.readMono(type, type, serverRequest, response, context.hints()))
.orElseGet(() -> reader.readMono(type, serverRequest, context.hints()));
}; };
} }
/** /**
* Return a {@code BodyExtractor} that reads multipart (i.e. file upload) form data into a * Extractor to read multipart data into {@code Flux<Part>}.
* {@link MultiValueMap}. * @return {@code BodyExtractor} for multipart request parts
* @return a {@code BodyExtractor} that reads multipart data
*/ */
// Note that the returned BodyExtractor is parameterized to ServerHttpRequest, not // Parameterized for server-side use
// ReactiveHttpInputMessage like other methods, since reading form data only typically happens on
// the server-side
public static BodyExtractor<Flux<Part>, ServerHttpRequest> toParts() { public static BodyExtractor<Flux<Part>, ServerHttpRequest> toParts() {
return (serverRequest, context) -> { return (serverRequest, context) -> {
ResolvableType type = PART_TYPE; ResolvableType elementType = PART_TYPE;
HttpMessageReader<Part> reader = messageReader(type, MediaType.MULTIPART_FORM_DATA, context); MediaType mediaType = MediaType.MULTIPART_FORM_DATA;
return context.serverResponse() HttpMessageReader<Part> reader = findReader(elementType, mediaType, context);
.map(response -> reader.read(type, type, serverRequest, response, context.hints())) return readToFlux(serverRequest, context, elementType, reader);
.orElseGet(() -> reader.read(type, serverRequest, context.hints()));
}; };
} }
/** /**
* Return a {@code BodyExtractor} that returns the body of the message as a {@link Flux} of * Extractor that returns the raw {@link DataBuffer}s.
* {@link DataBuffer}s. * <p><strong>Note:</strong> the data buffers should be
* <p><strong>Note</strong> that the returned buffers should be released after usage by calling * {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer)
* {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer)} * released} after being used.
* @return a {@code BodyExtractor} that returns the body * @return {@code BodyExtractor} for data buffers
* @see ReactiveHttpInputMessage#getBody()
*/ */
public static BodyExtractor<Flux<DataBuffer>, ReactiveHttpInputMessage> toDataBuffers() { public static BodyExtractor<Flux<DataBuffer>, ReactiveHttpInputMessage> toDataBuffers() {
return (inputMessage, context) -> inputMessage.getBody(); return (inputMessage, context) -> inputMessage.getBody();
} }
// Private support methods
private static <T, S extends Publisher<T>> S readWithMessageReaders( private static <T, S extends Publisher<T>> S readWithMessageReaders(
ReactiveHttpInputMessage inputMessage, BodyExtractor.Context context, ResolvableType elementType, ReactiveHttpInputMessage message, BodyExtractor.Context context, ResolvableType elementType,
Function<HttpMessageReader<T>, S> readerFunction, Function<HttpMessageReader<T>, S> readerFunction,
Function<UnsupportedMediaTypeException, S> unsupportedError, Function<UnsupportedMediaTypeException, S> errorFunction,
Supplier<S> empty) { Supplier<S> emptySupplier) {
if (VOID_TYPE.equals(elementType)) { if (VOID_TYPE.equals(elementType)) {
return empty.get(); return emptySupplier.get();
} }
MediaType contentType = contentType(inputMessage);
List<HttpMessageReader<?>> messageReaders = context.messageReaders(); MediaType contentType = Optional.ofNullable(message.getHeaders().getContentType())
return messageReaders.stream() .orElse(MediaType.APPLICATION_OCTET_STREAM);
.filter(r -> r.canRead(elementType, contentType))
return context.messageReaders().stream()
.filter(reader -> reader.canRead(elementType, contentType))
.findFirst() .findFirst()
.map(BodyExtractors::<T>cast) .map(BodyExtractors::<T>cast)
.map(readerFunction) .map(readerFunction)
.orElseGet(() -> { .orElseGet(() -> errorFunction.apply(unsupportedError(context, elementType, contentType)));
List<MediaType> supportedMediaTypes = messageReaders.stream()
.flatMap(reader -> reader.getReadableMediaTypes().stream())
.collect(Collectors.toList());
UnsupportedMediaTypeException error =
new UnsupportedMediaTypeException(contentType, supportedMediaTypes, elementType);
return unsupportedError.apply(error);
});
} }
private static <T> HttpMessageReader<T> messageReader(ResolvableType elementType, private static UnsupportedMediaTypeException unsupportedError(BodyExtractor.Context context,
MediaType mediaType, BodyExtractor.Context context) { ResolvableType elementType, MediaType contentType) {
List<MediaType> supportedMediaTypes = context.messageReaders().stream()
.flatMap(reader -> reader.getReadableMediaTypes().stream())
.collect(Collectors.toList());
return new UnsupportedMediaTypeException(contentType, supportedMediaTypes, elementType);
}
private static <T> Mono<T> readToMono(ReactiveHttpInputMessage message, BodyExtractor.Context context,
ResolvableType type, HttpMessageReader<T> reader) {
return context.serverResponse()
.map(response -> reader.readMono(type, type, (ServerHttpRequest) message, response, context.hints()))
.orElseGet(() -> reader.readMono(type, message, context.hints()));
}
private static <T> Flux<T> readToFlux(ReactiveHttpInputMessage message, BodyExtractor.Context context,
ResolvableType type, HttpMessageReader<T> reader) {
return context.serverResponse()
.map(response -> reader.read(type, type, (ServerHttpRequest) message, response, context.hints()))
.orElseGet(() -> reader.read(type, message, context.hints()));
}
private static <T> Flux<T> unsupportedErrorHandler(
ReactiveHttpInputMessage inputMessage, UnsupportedMediaTypeException ex) {
if (inputMessage.getHeaders().getContentType() == null) {
// Empty body with no content type is ok
return inputMessage.getBody().map(o -> {
throw ex;
});
}
else {
return Flux.error(ex);
}
}
private static <T> HttpMessageReader<T> findReader(
ResolvableType elementType, MediaType mediaType, BodyExtractor.Context context) {
return context.messageReaders().stream() return context.messageReaders().stream()
.filter(messageReader -> messageReader.canRead(elementType, mediaType)) .filter(messageReader -> messageReader.canRead(elementType, mediaType))
.findFirst() .findFirst()
.map(BodyExtractors::<T>cast) .map(BodyExtractors::<T>cast)
.orElseThrow(() -> new IllegalStateException( .orElseThrow(() -> new IllegalStateException(
"Could not find HttpMessageReader that supports \"" + mediaType + "No HttpMessageReader for \"" + mediaType + "\" and \"" + elementType + "\""));
"\" and \"" + elementType + "\""));
}
private static MediaType contentType(HttpMessage message) {
MediaType result = message.getHeaders().getContentType();
return result != null ? result : MediaType.APPLICATION_OCTET_STREAM;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private static <T> HttpMessageReader<T> cast(HttpMessageReader<?> messageReader) { private static <T> HttpMessageReader<T> cast(HttpMessageReader<?> reader) {
return (HttpMessageReader<T>) messageReader; return (HttpMessageReader<T>) reader;
} }
} }

View File

@ -17,7 +17,6 @@
package org.springframework.web.reactive.function; package org.springframework.web.reactive.function;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
@ -34,7 +33,6 @@ import org.springframework.http.client.MultipartBodyBuilder;
import org.springframework.http.client.reactive.ClientHttpRequest; import org.springframework.http.client.reactive.ClientHttpRequest;
import org.springframework.http.codec.HttpMessageWriter; import org.springframework.http.codec.HttpMessageWriter;
import org.springframework.http.codec.ServerSentEvent; import org.springframework.http.codec.ServerSentEvent;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -46,147 +44,124 @@ import org.springframework.util.MultiValueMap;
* server-sent events, resources, etc. * server-sent events, resources, etc.
* *
* @author Arjen Poutsma * @author Arjen Poutsma
* @author Rossen Stoyanchev
* @since 5.0 * @since 5.0
*/ */
public abstract class BodyInserters { public abstract class BodyInserters {
private static final ResolvableType RESOURCE_TYPE = private static final ResolvableType RESOURCE_TYPE = ResolvableType.forClass(Resource.class);
ResolvableType.forClass(Resource.class);
private static final ResolvableType SERVER_SIDE_EVENT_TYPE = private static final ResolvableType SSE_TYPE = ResolvableType.forClass(ServerSentEvent.class);
ResolvableType.forClass(ServerSentEvent.class);
private static final ResolvableType FORM_TYPE = private static final ResolvableType FORM_DATA_TYPE =
ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, String.class); ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, String.class);
private static final ResolvableType MULTIPART_VALUE_TYPE = ResolvableType.forClassWithGenerics( private static final ResolvableType MULTIPART_DATA_TYPE = ResolvableType.forClassWithGenerics(
MultiValueMap.class, String.class, Object.class); MultiValueMap.class, String.class, Object.class);
private static final BodyInserter<Void, ReactiveHttpOutputMessage> EMPTY = private static final BodyInserter<Void, ReactiveHttpOutputMessage> EMPTY_INSERTER =
(response, context) -> response.setComplete(); (response, context) -> response.setComplete();
/** /**
* Return an empty {@code BodyInserter} that writes nothing. * Inserter that does not write.
* @return an empty {@code BodyInserter} * @return the inserter
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static <T> BodyInserter<T, ReactiveHttpOutputMessage> empty() { public static <T> BodyInserter<T, ReactiveHttpOutputMessage> empty() {
return (BodyInserter<T, ReactiveHttpOutputMessage>)EMPTY; return (BodyInserter<T, ReactiveHttpOutputMessage>) EMPTY_INSERTER;
} }
/** /**
* Return a {@code BodyInserter} that writes the given single object. * Inserter to write the given object.
* <p>Note also that * <p>Alternatively, consider using the {@code syncBody(Object)} shortcuts on
* {@link org.springframework.web.reactive.function.client.WebClient WebClient} and * {@link org.springframework.web.reactive.function.client.WebClient WebClient} and
* {@link org.springframework.web.reactive.function.server.ServerResponse ServerResponse} * {@link org.springframework.web.reactive.function.server.ServerResponse ServerResponse}.
* each offer a {@code syncBody(Object)} shortcut for providing an Object * @param body the body to write to the response
* as the body. * @param <T> the type of the body
* @param body the body of the response * @return the inserter to write a single object
* @return a {@code BodyInserter} that writes a single object
*/ */
public static <T> BodyInserter<T, ReactiveHttpOutputMessage> fromObject(T body) { public static <T> BodyInserter<T, ReactiveHttpOutputMessage> fromObject(T body) {
return bodyInserterFor(Mono.just(body), ResolvableType.forInstance(body)); return (message, context) ->
writeWithMessageWriters(message, context, Mono.just(body), ResolvableType.forInstance(body));
} }
/** /**
* Return a {@code BodyInserter} that writes the given {@link Publisher}. * Inserter to write the given {@link Publisher}.
* <p>Note also that * <p>Alternatively, consider using the {@code body} shortcuts on
* {@link org.springframework.web.reactive.function.client.WebClient WebClient} and * {@link org.springframework.web.reactive.function.client.WebClient WebClient} and
* {@link org.springframework.web.reactive.function.server.ServerResponse ServerResponse} * {@link org.springframework.web.reactive.function.server.ServerResponse ServerResponse}.
* each offer {@code body} shortcut methods for providing a Publisher as the body. * @param publisher the publisher to write with
* @param publisher the publisher to stream to the response body * @param elementClass the type of elements in the publisher
* @param elementClass the class of elements contained in the publisher
* @param <T> the type of the elements contained in the publisher * @param <T> the type of the elements contained in the publisher
* @param <P> the type of the {@code Publisher} * @param <P> the {@code Publisher} type
* @return a {@code BodyInserter} that writes a {@code Publisher} * @return the inserter to write a {@code Publisher}
*/ */
public static <T, P extends Publisher<T>> BodyInserter<P, ReactiveHttpOutputMessage> fromPublisher( public static <T, P extends Publisher<T>> BodyInserter<P, ReactiveHttpOutputMessage> fromPublisher(
P publisher, Class<T> elementClass) { P publisher, Class<T> elementClass) {
return bodyInserterFor(publisher, ResolvableType.forClass(elementClass)); return (message, context) ->
writeWithMessageWriters(message, context, publisher, ResolvableType.forClass(elementClass));
} }
/** /**
* Return a {@code BodyInserter} that writes the given {@link Publisher}. * Inserter to write the given {@link Publisher}.
* <p>Note also that * <p>Alternatively, consider using the {@code body} shortcuts on
* {@link org.springframework.web.reactive.function.client.WebClient WebClient} and * {@link org.springframework.web.reactive.function.client.WebClient WebClient} and
* {@link org.springframework.web.reactive.function.server.ServerResponse ServerResponse} * {@link org.springframework.web.reactive.function.server.ServerResponse ServerResponse}.
* each offer {@code body} shortcut methods for providing a Publisher as the body. * @param publisher the publisher to write with
* @param publisher the publisher to stream to the response body * @param typeRef the type of elements contained in the publisher
* @param typeReference the type of elements contained in the publisher
* @param <T> the type of the elements contained in the publisher * @param <T> the type of the elements contained in the publisher
* @param <P> the type of the {@code Publisher} * @param <P> the {@code Publisher} type
* @return a {@code BodyInserter} that writes a {@code Publisher} * @return the inserter to write a {@code Publisher}
*/ */
public static <T, P extends Publisher<T>> BodyInserter<P, ReactiveHttpOutputMessage> fromPublisher( public static <T, P extends Publisher<T>> BodyInserter<P, ReactiveHttpOutputMessage> fromPublisher(
P publisher, ParameterizedTypeReference<T> typeReference) { P publisher, ParameterizedTypeReference<T> typeRef) {
return bodyInserterFor(publisher, ResolvableType.forType(typeReference.getType())); return (message, context) ->
writeWithMessageWriters(message, context, publisher, ResolvableType.forType(typeRef.getType()));
} }
/** /**
* Return a {@code BodyInserter} that writes the given {@code Resource}. * Inserter to write the given {@code Resource}.
* <p>If the resource can be resolved to a {@linkplain Resource#getFile() file}, it will * <p>If the resource can be resolved to a {@linkplain Resource#getFile() file}, it will
* be copied using <a href="https://en.wikipedia.org/wiki/Zero-copy">zero-copy</a>. * be copied using <a href="https://en.wikipedia.org/wiki/Zero-copy">zero-copy</a>.
* @param resource the resource to write to the output message * @param resource the resource to write to the output message
* @param <T> the type of the {@code Resource} * @param <T> the type of the {@code Resource}
* @return a {@code BodyInserter} that writes a {@code Publisher} * @return the inserter to write a {@code Publisher}
*/ */
public static <T extends Resource> BodyInserter<T, ReactiveHttpOutputMessage> fromResource(T resource) { public static <T extends Resource> BodyInserter<T, ReactiveHttpOutputMessage> fromResource(T resource) {
return (outputMessage, context) -> { return (outputMessage, context) -> {
Mono<T> inputStream = Mono.just(resource); ResolvableType elementType = RESOURCE_TYPE;
HttpMessageWriter<Resource> messageWriter = resourceHttpMessageWriter(context); HttpMessageWriter<Resource> writer = findWriter(context, elementType, null);
Optional<ServerHttpRequest> serverRequest = context.serverRequest(); return write(Mono.just(resource), elementType, null, outputMessage, context, writer);
if (serverRequest.isPresent() && outputMessage instanceof ServerHttpResponse) {
return messageWriter.write(inputStream, RESOURCE_TYPE, RESOURCE_TYPE, null,
serverRequest.get(), (ServerHttpResponse) outputMessage, context.hints());
}
else {
return messageWriter.write(inputStream, RESOURCE_TYPE, null, outputMessage, context.hints());
}
}; };
} }
private static HttpMessageWriter<Resource> resourceHttpMessageWriter(BodyInserter.Context context) {
return context.messageWriters().stream()
.filter(messageWriter -> messageWriter.canWrite(RESOURCE_TYPE, null))
.findFirst()
.map(BodyInserters::<Resource>cast)
.orElseThrow(() -> new IllegalStateException(
"Could not find HttpMessageWriter that supports Resource objects"));
}
/** /**
* Return a {@code BodyInserter} that writes the given {@code ServerSentEvent} publisher. * Inserter to write the given {@code ServerSentEvent} publisher.
* <p>Note that a SSE {@code BodyInserter} can also be obtained by passing a stream of strings * <p>Alternatively, you can provide event data objects via
* or POJOs (to be encoded as JSON) to {@link #fromPublisher(Publisher, Class)}, and specifying a * {@link #fromPublisher(Publisher, Class)}, and set the "Content-Type" to
* {@link MediaType#TEXT_EVENT_STREAM text/event-stream} Content-Type. * {@link MediaType#TEXT_EVENT_STREAM text/event-stream}.
* @param eventsPublisher the {@code ServerSentEvent} publisher to write to the response body * @param eventsPublisher the {@code ServerSentEvent} publisher to write to the response body
* @param <T> the type of the elements contained in the {@link ServerSentEvent} * @param <T> the type of the data elements in the {@link ServerSentEvent}
* @return a {@code BodyInserter} that writes a {@code ServerSentEvent} publisher * @return the inserter to write a {@code ServerSentEvent} publisher
* @see <a href="https://www.w3.org/TR/eventsource/">Server-Sent Events W3C recommendation</a> * @see <a href="https://www.w3.org/TR/eventsource/">Server-Sent Events W3C recommendation</a>
*/ */
// Note that the returned BodyInserter is parameterized to ServerHttpResponse, not // Parameterized for server-side use
// ReactiveHttpOutputMessage like other methods, since sending SSEs only typically happens on
// the server-side
public static <T, S extends Publisher<ServerSentEvent<T>>> BodyInserter<S, ServerHttpResponse> fromServerSentEvents( public static <T, S extends Publisher<ServerSentEvent<T>>> BodyInserter<S, ServerHttpResponse> fromServerSentEvents(
S eventsPublisher) { S eventsPublisher) {
return (serverResponse, context) -> { return (serverResponse, context) -> {
HttpMessageWriter<ServerSentEvent<T>> messageWriter = ResolvableType elmentType = SSE_TYPE;
findMessageWriter(context, SERVER_SIDE_EVENT_TYPE, MediaType.TEXT_EVENT_STREAM); MediaType mediaType = MediaType.TEXT_EVENT_STREAM;
return context.serverRequest() HttpMessageWriter<ServerSentEvent<T>> writer = findWriter(context, elmentType, mediaType);
.map(serverRequest -> messageWriter.write(eventsPublisher, SERVER_SIDE_EVENT_TYPE, return write(eventsPublisher, elmentType, mediaType, serverResponse, context, writer);
SERVER_SIDE_EVENT_TYPE, MediaType.TEXT_EVENT_STREAM, serverRequest,
serverResponse, context.hints()))
.orElseGet(() -> messageWriter.write(eventsPublisher, SERVER_SIDE_EVENT_TYPE,
MediaType.TEXT_EVENT_STREAM, serverResponse, context.hints()));
}; };
} }
/** /**
* Return a {@link FormInserter} that writes the given {@code MultiValueMap} * Return a {@link FormInserter} to write the given {@code MultiValueMap}
* as URL-encoded form data. The returned inserter allows for additional * as URL-encoded form data. The returned inserter allows for additional
* entries to be added via {@link FormInserter#with(String, Object)}. * entries to be added via {@link FormInserter#with(String, Object)}.
* *
@ -204,7 +179,7 @@ public abstract class BodyInserters {
} }
/** /**
* Return a {@link FormInserter} that writes the given key-value pair as * Return a {@link FormInserter} to write the given key-value pair as
* URL-encoded form data. The returned inserter allows for additional * URL-encoded form data. The returned inserter allows for additional
* entries to be added via {@link FormInserter#with(String, Object)}. * entries to be added via {@link FormInserter#with(String, Object)}.
* @param name the key to add to the form * @param name the key to add to the form
@ -218,7 +193,7 @@ public abstract class BodyInserters {
} }
/** /**
* Return a {@link MultipartInserter} that writes the given * Return a {@link MultipartInserter} to write the given
* {@code MultiValueMap} as multipart data. Values in the map can be an * {@code MultiValueMap} as multipart data. Values in the map can be an
* Object or an {@link HttpEntity}. * Object or an {@link HttpEntity}.
* <p>Note that you can also build the multipart data externally with * <p>Note that you can also build the multipart data externally with
@ -234,7 +209,7 @@ public abstract class BodyInserters {
} }
/** /**
* Return a {@link MultipartInserter} that writes the given parts, * Return a {@link MultipartInserter} to write the given parts,
* as multipart data. Values in the map can be an Object or an * as multipart data. Values in the map can be an Object or an
* {@link HttpEntity}. * {@link HttpEntity}.
* <p>Note that you can also build the multipart data externally with * <p>Note that you can also build the multipart data externally with
@ -251,7 +226,7 @@ public abstract class BodyInserters {
} }
/** /**
* Return a {@link MultipartInserter} that writes the given asynchronous parts, * Return a {@link MultipartInserter} to write the given asynchronous parts,
* as multipart data. * as multipart data.
* <p>Note that you can also build the multipart data externally with * <p>Note that you can also build the multipart data externally with
* {@link MultipartBodyBuilder}, and pass the resulting map directly to the * {@link MultipartBodyBuilder}, and pass the resulting map directly to the
@ -286,11 +261,11 @@ public abstract class BodyInserters {
} }
/** /**
* Return a {@code BodyInserter} that writes the given * Inserter to write the given
* {@code Publisher<DataBuffer>} to the body. * {@code Publisher<DataBuffer>} to the body.
* @param publisher the data buffer publisher to write * @param publisher the data buffer publisher to write
* @param <T> the type of the publisher * @param <T> the type of the publisher
* @return a {@code BodyInserter} that writes directly to the body * @return the inserter to write directly to the body
* @see ReactiveHttpOutputMessage#writeWith(Publisher) * @see ReactiveHttpOutputMessage#writeWith(Publisher)
*/ */
public static <T extends Publisher<DataBuffer>> BodyInserter<T, ReactiveHttpOutputMessage> fromDataBuffers( public static <T extends Publisher<DataBuffer>> BodyInserter<T, ReactiveHttpOutputMessage> fromDataBuffers(
@ -301,47 +276,49 @@ public abstract class BodyInserters {
} }
private static <T, P extends Publisher<?>, M extends ReactiveHttpOutputMessage> BodyInserter<T, M> bodyInserterFor( private static <P extends Publisher<?>, M extends ReactiveHttpOutputMessage> Mono<Void> writeWithMessageWriters(
P body, ResolvableType bodyType) { M outputMessage, BodyInserter.Context context, P body, ResolvableType bodyType) {
return (outputMessage, context) -> { MediaType mediaType = outputMessage.getHeaders().getContentType();
MediaType contentType = outputMessage.getHeaders().getContentType(); return context.messageWriters().stream()
List<HttpMessageWriter<?>> messageWriters = context.messageWriters(); .filter(messageWriter -> messageWriter.canWrite(bodyType, mediaType))
return messageWriters.stream()
.filter(messageWriter -> messageWriter.canWrite(bodyType, contentType))
.findFirst() .findFirst()
.map(BodyInserters::cast) .map(BodyInserters::cast)
.map(messageWriter -> { .map(writer -> write(body, bodyType, mediaType, outputMessage, context, writer))
Optional<ServerHttpRequest> serverRequest = context.serverRequest(); .orElseGet(() -> Mono.error(unsupportedError(bodyType, context, mediaType)));
if (serverRequest.isPresent() && outputMessage instanceof ServerHttpResponse) {
return messageWriter.write(body, bodyType, bodyType, contentType,
serverRequest.get(), (ServerHttpResponse) outputMessage,
context.hints());
} }
else {
return messageWriter.write(body, bodyType, contentType, outputMessage, context.hints()); private static UnsupportedMediaTypeException unsupportedError(ResolvableType bodyType,
} BodyInserter.Context context, @Nullable MediaType mediaType) {
})
.orElseGet(() -> { List<MediaType> supportedMediaTypes = context.messageWriters().stream()
List<MediaType> supportedMediaTypes = messageWriters.stream()
.flatMap(reader -> reader.getWritableMediaTypes().stream()) .flatMap(reader -> reader.getWritableMediaTypes().stream())
.collect(Collectors.toList()); .collect(Collectors.toList());
UnsupportedMediaTypeException error =
new UnsupportedMediaTypeException(contentType, supportedMediaTypes, bodyType); return new UnsupportedMediaTypeException(mediaType, supportedMediaTypes, bodyType);
return Mono.error(error);
});
};
} }
private static <T> HttpMessageWriter<T> findMessageWriter( private static <T> Mono<Void> write(Publisher<? extends T> input, ResolvableType type,
BodyInserter.Context context, ResolvableType type, MediaType mediaType) { @Nullable MediaType mediaType, ReactiveHttpOutputMessage message,
BodyInserter.Context context, HttpMessageWriter<T> writer) {
return context.serverRequest()
.map(request -> {
ServerHttpResponse response = (ServerHttpResponse) message;
return writer.write(input, type, type, mediaType, request, response, context.hints());
})
.orElseGet(() -> writer.write(input, type, mediaType, message, context.hints()));
}
private static <T> HttpMessageWriter<T> findWriter(
BodyInserter.Context context, ResolvableType elementType, @Nullable MediaType mediaType) {
return context.messageWriters().stream() return context.messageWriters().stream()
.filter(messageWriter -> messageWriter.canWrite(type, mediaType)) .filter(messageWriter -> messageWriter.canWrite(elementType, mediaType))
.findFirst() .findFirst()
.map(BodyInserters::<T>cast) .map(BodyInserters::<T>cast)
.orElseThrow(() -> new IllegalStateException( .orElseThrow(() -> new IllegalStateException(
"Could not find HttpMessageWriter that supports " + mediaType)); "No HttpMessageWriter for \"" + mediaType + "\" and \"" + elementType + "\""));
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -425,8 +402,8 @@ public abstract class BodyInserters {
@Override @Override
public Mono<Void> insert(ClientHttpRequest outputMessage, Context context) { public Mono<Void> insert(ClientHttpRequest outputMessage, Context context) {
HttpMessageWriter<MultiValueMap<String, String>> messageWriter = HttpMessageWriter<MultiValueMap<String, String>> messageWriter =
findMessageWriter(context, FORM_TYPE, MediaType.APPLICATION_FORM_URLENCODED); findWriter(context, FORM_DATA_TYPE, MediaType.APPLICATION_FORM_URLENCODED);
return messageWriter.write(Mono.just(this.data), FORM_TYPE, return messageWriter.write(Mono.just(this.data), FORM_DATA_TYPE,
MediaType.APPLICATION_FORM_URLENCODED, MediaType.APPLICATION_FORM_URLENCODED,
outputMessage, context.hints()); outputMessage, context.hints());
} }
@ -477,9 +454,9 @@ public abstract class BodyInserters {
@Override @Override
public Mono<Void> insert(ClientHttpRequest outputMessage, Context context) { public Mono<Void> insert(ClientHttpRequest outputMessage, Context context) {
HttpMessageWriter<MultiValueMap<String, HttpEntity<?>>> messageWriter = HttpMessageWriter<MultiValueMap<String, HttpEntity<?>>> messageWriter =
findMessageWriter(context, MULTIPART_VALUE_TYPE, MediaType.MULTIPART_FORM_DATA); findWriter(context, MULTIPART_DATA_TYPE, MediaType.MULTIPART_FORM_DATA);
MultiValueMap<String, HttpEntity<?>> body = this.builder.build(); MultiValueMap<String, HttpEntity<?>> body = this.builder.build();
return messageWriter.write(Mono.just(body), MULTIPART_VALUE_TYPE, return messageWriter.write(Mono.just(body), MULTIPART_DATA_TYPE,
MediaType.MULTIPART_FORM_DATA, MediaType.MULTIPART_FORM_DATA,
outputMessage, context.hints()); outputMessage, context.hints());
} }