diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyExtractors.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyExtractors.java index 84b406949f..5faa16ded0 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyExtractors.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyExtractors.java @@ -29,256 +29,226 @@ import reactor.core.publisher.Mono; import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.http.HttpMessage; import org.springframework.http.MediaType; import org.springframework.http.ReactiveHttpInputMessage; import org.springframework.http.codec.HttpMessageReader; import org.springframework.http.codec.multipart.Part; import org.springframework.http.server.reactive.ServerHttpRequest; -import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.util.MultiValueMap; /** - * Implementations of {@link BodyExtractor} that read various bodies, such a reactive streams. + * Static factory methods for {@link BodyExtractor} implementations. * * @author Arjen Poutsma * @author Sebastien Deleuze + * @author Rossen Stoyanchev * @since 5.0 */ public abstract class BodyExtractors { - private static final ResolvableType FORM_MAP_TYPE = + private static final ResolvableType FORM_DATA_TYPE = ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, String.class); - private static final ResolvableType MULTIPART_MAP_TYPE = ResolvableType.forClassWithGenerics( + private static final ResolvableType MULTIPART_DATA_TYPE = ResolvableType.forClassWithGenerics( MultiValueMap.class, String.class, Part.class); private static final ResolvableType PART_TYPE = ResolvableType.forClass(Part.class); private static final ResolvableType VOID_TYPE = ResolvableType.forClass(Void.class); + /** - * Return a {@code BodyExtractor} that reads into a Reactor {@link Mono}. - * @param elementClass the class of element in the {@code Mono} - * @param the element type - * @return a {@code BodyExtractor} that reads a mono + * Extractor to decode the input content into {@code Mono}. + * @param elementClass the class of the element type to decode to + * @param the element type to decode to + * @return {@code BodyExtractor} for {@code Mono} */ public static BodyExtractor, ReactiveHttpInputMessage> toMono(Class elementClass) { return toMono(ResolvableType.forClass(elementClass)); } /** - * Return a {@code BodyExtractor} that reads into a Reactor {@link Mono}. - * The given {@link ParameterizedTypeReference} is used to pass generic type information, for - * instance when using the {@link org.springframework.web.reactive.function.client.WebClient WebClient} - *
-	 * Mono<Map<String, String>> body = this.webClient
-	 *  .get()
-	 *  .uri("http://example.com")
-	 *  .exchange()
-	 *  .flatMap(r -> r.body(toMono(new ParameterizedTypeReference<Map<String,String>>() {})));
-	 * 
- * @param typeReference a reference to the type of element in the {@code Mono} - * @param the element type - * @return a {@code BodyExtractor} that reads a mono + * Variant of {@link #toMono(Class)} for type information with generics. + * @param typeRef the type reference for the type to decode to + * @param the element type to decode to + * @return {@code BodyExtractor} for {@code Mono} */ - public static BodyExtractor, ReactiveHttpInputMessage> toMono( - ParameterizedTypeReference typeReference) { - - return toMono(ResolvableType.forType(typeReference.getType())); + public static BodyExtractor, ReactiveHttpInputMessage> toMono(ParameterizedTypeReference typeRef) { + return toMono(ResolvableType.forType(typeRef.getType())); } - static BodyExtractor, ReactiveHttpInputMessage> toMono(ResolvableType elementType) { - return (inputMessage, context) -> readWithMessageReaders(inputMessage, context, - elementType, - (HttpMessageReader reader) -> { - Optional serverResponse = context.serverResponse(); - if (serverResponse.isPresent() && inputMessage instanceof ServerHttpRequest) { - return reader.readMono(elementType, elementType, (ServerHttpRequest) inputMessage, - serverResponse.get(), context.hints()); - } - else { - return reader.readMono(elementType, inputMessage, context.hints()); - } - }, - ex -> (inputMessage.getHeaders().getContentType() == null) ? - Mono.from(permitEmptyOrFail(inputMessage, ex)) : Mono.error(ex), - Mono::empty); + private static BodyExtractor, ReactiveHttpInputMessage> toMono(ResolvableType elementType) { + return (inputMessage, context) -> + readWithMessageReaders(inputMessage, context, elementType, + (HttpMessageReader reader) -> readToMono(inputMessage, context, elementType, reader), + ex -> Mono.from(unsupportedErrorHandler(inputMessage, ex)), + Mono::empty); } /** - * Return a {@code BodyExtractor} that reads into a Reactor {@link Flux}. - * @param elementClass the class of element in the {@code Flux} - * @param the element type - * @return a {@code BodyExtractor} that reads a flux + * Extractor to decode the input content into {@code Flux}. + * @param elementClass the class of the element type to decode to + * @param the element type to decode to + * @return {@code BodyExtractor} for {@code Flux} */ public static BodyExtractor, ReactiveHttpInputMessage> toFlux(Class elementClass) { return toFlux(ResolvableType.forClass(elementClass)); } /** - * Return a {@code BodyExtractor} that reads into a Reactor {@link Flux}. - * The given {@link ParameterizedTypeReference} is used to pass generic type information, for - * instance when using the {@link org.springframework.web.reactive.function.client.WebClient WebClient} - *
-	 * Flux<ServerSentEvent<String>> body = this.webClient
-	 *  .get()
-	 *  .uri("http://example.com")
-	 *  .exchange()
-	 *  .flatMap(r -> r.body(toFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {})));
-	 * 
- * @param typeReference a reference to the type of element in the {@code Flux} - * @param the element type - * @return a {@code BodyExtractor} that reads a flux + * Variant of {@link #toFlux(Class)} for type information with generics. + * @param typeRef the type reference for the type to decode to + * @param the element type to decode to + * @return {@code BodyExtractor} for {@code Flux} */ - public static BodyExtractor, ReactiveHttpInputMessage> toFlux( - ParameterizedTypeReference typeReference) { - - return toFlux(ResolvableType.forType(typeReference.getType())); + public static BodyExtractor, ReactiveHttpInputMessage> toFlux(ParameterizedTypeReference typeRef) { + return toFlux(ResolvableType.forType(typeRef.getType())); } @SuppressWarnings("unchecked") - static BodyExtractor, ReactiveHttpInputMessage> toFlux(ResolvableType elementType) { - return (inputMessage, context) -> readWithMessageReaders(inputMessage, context, - elementType, - (HttpMessageReader reader) -> { - Optional serverResponse = context.serverResponse(); - if (serverResponse.isPresent() && inputMessage instanceof ServerHttpRequest) { - return reader.read(elementType, elementType, (ServerHttpRequest) inputMessage, - serverResponse.get(), context.hints()); - } - else { - return reader.read(elementType, inputMessage, context.hints()); - } - }, - ex -> (inputMessage.getHeaders().getContentType() == null) ? - permitEmptyOrFail(inputMessage, ex) : Flux.error(ex), - Flux::empty); + private static BodyExtractor, ReactiveHttpInputMessage> toFlux(ResolvableType elementType) { + return (inputMessage, context) -> + readWithMessageReaders(inputMessage, context, elementType, + (HttpMessageReader reader) -> readToFlux(inputMessage, context, elementType, reader), + ex -> unsupportedErrorHandler(inputMessage, ex), + Flux::empty); } - @SuppressWarnings("unchecked") - private static Flux permitEmptyOrFail(ReactiveHttpInputMessage message, UnsupportedMediaTypeException ex) { - return message.getBody().doOnNext(buffer -> { - throw ex; - }).map(o -> (T) o); - } + + // Extractors for specific content .. /** - * Return a {@code BodyExtractor} that reads form data into a {@link MultiValueMap}. + * Extractor to read form data into {@code MultiValueMap}. *

As of 5.1 this method can also be used on the client side to read form * data from a server response (e.g. OAuth). - * @return a {@code BodyExtractor} that reads form data + * @return {@code BodyExtractor} for form data */ public static BodyExtractor>, ReactiveHttpInputMessage> toFormData() { return (message, context) -> { - ResolvableType type = FORM_MAP_TYPE; - HttpMessageReader> reader = - messageReader(type, MediaType.APPLICATION_FORM_URLENCODED, context); - Optional response = context.serverResponse(); - if (response.isPresent() && message instanceof ServerHttpRequest) { - return reader.readMono(type, type, (ServerHttpRequest) message, response.get(), context.hints()); - } - else { - return reader.readMono(type, message, context.hints()); - } + ResolvableType elementType = FORM_DATA_TYPE; + MediaType mediaType = MediaType.APPLICATION_FORM_URLENCODED; + HttpMessageReader> reader = findReader(elementType, mediaType, context); + return readToMono(message, context, elementType, reader); }; } /** - * Return a {@code BodyExtractor} that reads multipart (i.e. file upload) form data into a - * {@link MultiValueMap}. - * @return a {@code BodyExtractor} that reads multipart data + * Extractor to read multipart data into a {@code MultiValueMap}. + * @return {@code BodyExtractor} for multipart data */ - // Note that the returned BodyExtractor is parameterized to ServerHttpRequest, not - // ReactiveHttpInputMessage like other methods, since reading form data only typically happens on - // the server-side + // Parameterized for server-side use public static BodyExtractor>, ServerHttpRequest> toMultipartData() { return (serverRequest, context) -> { - ResolvableType type = MULTIPART_MAP_TYPE; - HttpMessageReader> reader = - messageReader(type, MediaType.MULTIPART_FORM_DATA, context); - return context.serverResponse() - .map(response -> reader.readMono(type, type, serverRequest, response, context.hints())) - .orElseGet(() -> reader.readMono(type, serverRequest, context.hints())); + ResolvableType elementType = MULTIPART_DATA_TYPE; + MediaType mediaType = MediaType.MULTIPART_FORM_DATA; + HttpMessageReader> reader = findReader(elementType, mediaType, context); + return readToMono(serverRequest, context, elementType, reader); }; } /** - * Return a {@code BodyExtractor} that reads multipart (i.e. file upload) form data into a - * {@link MultiValueMap}. - * @return a {@code BodyExtractor} that reads multipart data + * Extractor to read multipart data into {@code Flux}. + * @return {@code BodyExtractor} for multipart request parts */ - // Note that the returned BodyExtractor is parameterized to ServerHttpRequest, not - // ReactiveHttpInputMessage like other methods, since reading form data only typically happens on - // the server-side + // Parameterized for server-side use public static BodyExtractor, ServerHttpRequest> toParts() { return (serverRequest, context) -> { - ResolvableType type = PART_TYPE; - HttpMessageReader reader = messageReader(type, MediaType.MULTIPART_FORM_DATA, context); - return context.serverResponse() - .map(response -> reader.read(type, type, serverRequest, response, context.hints())) - .orElseGet(() -> reader.read(type, serverRequest, context.hints())); + ResolvableType elementType = PART_TYPE; + MediaType mediaType = MediaType.MULTIPART_FORM_DATA; + HttpMessageReader reader = findReader(elementType, mediaType, context); + return readToFlux(serverRequest, context, elementType, reader); }; } /** - * Return a {@code BodyExtractor} that returns the body of the message as a {@link Flux} of - * {@link DataBuffer}s. - *

Note that the returned buffers should be released after usage by calling - * {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer)} - * @return a {@code BodyExtractor} that returns the body - * @see ReactiveHttpInputMessage#getBody() + * Extractor that returns the raw {@link DataBuffer}s. + *

Note: the data buffers should be + * {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer) + * released} after being used. + * @return {@code BodyExtractor} for data buffers */ public static BodyExtractor, ReactiveHttpInputMessage> toDataBuffers() { return (inputMessage, context) -> inputMessage.getBody(); } + // Private support methods + private static > S readWithMessageReaders( - ReactiveHttpInputMessage inputMessage, BodyExtractor.Context context, ResolvableType elementType, + ReactiveHttpInputMessage message, BodyExtractor.Context context, ResolvableType elementType, Function, S> readerFunction, - Function unsupportedError, - Supplier empty) { + Function errorFunction, + Supplier emptySupplier) { if (VOID_TYPE.equals(elementType)) { - return empty.get(); + return emptySupplier.get(); } - MediaType contentType = contentType(inputMessage); - List> messageReaders = context.messageReaders(); - return messageReaders.stream() - .filter(r -> r.canRead(elementType, contentType)) + + MediaType contentType = Optional.ofNullable(message.getHeaders().getContentType()) + .orElse(MediaType.APPLICATION_OCTET_STREAM); + + return context.messageReaders().stream() + .filter(reader -> reader.canRead(elementType, contentType)) .findFirst() .map(BodyExtractors::cast) .map(readerFunction) - .orElseGet(() -> { - List supportedMediaTypes = messageReaders.stream() - .flatMap(reader -> reader.getReadableMediaTypes().stream()) - .collect(Collectors.toList()); - UnsupportedMediaTypeException error = - new UnsupportedMediaTypeException(contentType, supportedMediaTypes, elementType); - return unsupportedError.apply(error); - }); + .orElseGet(() -> errorFunction.apply(unsupportedError(context, elementType, contentType))); } - private static HttpMessageReader messageReader(ResolvableType elementType, - MediaType mediaType, BodyExtractor.Context context) { + private static UnsupportedMediaTypeException unsupportedError(BodyExtractor.Context context, + ResolvableType elementType, MediaType contentType) { + + List supportedMediaTypes = context.messageReaders().stream() + .flatMap(reader -> reader.getReadableMediaTypes().stream()) + .collect(Collectors.toList()); + + return new UnsupportedMediaTypeException(contentType, supportedMediaTypes, elementType); + } + + private static Mono readToMono(ReactiveHttpInputMessage message, BodyExtractor.Context context, + ResolvableType type, HttpMessageReader reader) { + + return context.serverResponse() + .map(response -> reader.readMono(type, type, (ServerHttpRequest) message, response, context.hints())) + .orElseGet(() -> reader.readMono(type, message, context.hints())); + } + + private static Flux readToFlux(ReactiveHttpInputMessage message, BodyExtractor.Context context, + ResolvableType type, HttpMessageReader reader) { + + return context.serverResponse() + .map(response -> reader.read(type, type, (ServerHttpRequest) message, response, context.hints())) + .orElseGet(() -> reader.read(type, message, context.hints())); + } + + private static Flux unsupportedErrorHandler( + ReactiveHttpInputMessage inputMessage, UnsupportedMediaTypeException ex) { + + if (inputMessage.getHeaders().getContentType() == null) { + // Empty body with no content type is ok + return inputMessage.getBody().map(o -> { + throw ex; + }); + } + else { + return Flux.error(ex); + } + } + + private static HttpMessageReader findReader( + ResolvableType elementType, MediaType mediaType, BodyExtractor.Context context) { + return context.messageReaders().stream() .filter(messageReader -> messageReader.canRead(elementType, mediaType)) .findFirst() .map(BodyExtractors::cast) .orElseThrow(() -> new IllegalStateException( - "Could not find HttpMessageReader that supports \"" + mediaType + - "\" and \"" + elementType + "\"")); - } - - private static MediaType contentType(HttpMessage message) { - MediaType result = message.getHeaders().getContentType(); - return result != null ? result : MediaType.APPLICATION_OCTET_STREAM; + "No HttpMessageReader for \"" + mediaType + "\" and \"" + elementType + "\"")); } @SuppressWarnings("unchecked") - private static HttpMessageReader cast(HttpMessageReader messageReader) { - return (HttpMessageReader) messageReader; + private static HttpMessageReader cast(HttpMessageReader reader) { + return (HttpMessageReader) reader; } } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyInserters.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyInserters.java index 2ded80b24a..c295d2cf5d 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyInserters.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyInserters.java @@ -17,7 +17,6 @@ package org.springframework.web.reactive.function; import java.util.List; -import java.util.Optional; import java.util.stream.Collectors; import org.reactivestreams.Publisher; @@ -34,7 +33,6 @@ import org.springframework.http.client.MultipartBodyBuilder; import org.springframework.http.client.reactive.ClientHttpRequest; import org.springframework.http.codec.HttpMessageWriter; import org.springframework.http.codec.ServerSentEvent; -import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -46,147 +44,124 @@ import org.springframework.util.MultiValueMap; * server-sent events, resources, etc. * * @author Arjen Poutsma + * @author Rossen Stoyanchev * @since 5.0 */ public abstract class BodyInserters { - private static final ResolvableType RESOURCE_TYPE = - ResolvableType.forClass(Resource.class); + private static final ResolvableType RESOURCE_TYPE = ResolvableType.forClass(Resource.class); - private static final ResolvableType SERVER_SIDE_EVENT_TYPE = - ResolvableType.forClass(ServerSentEvent.class); + private static final ResolvableType SSE_TYPE = ResolvableType.forClass(ServerSentEvent.class); - private static final ResolvableType FORM_TYPE = + private static final ResolvableType FORM_DATA_TYPE = ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, String.class); - private static final ResolvableType MULTIPART_VALUE_TYPE = ResolvableType.forClassWithGenerics( + private static final ResolvableType MULTIPART_DATA_TYPE = ResolvableType.forClassWithGenerics( MultiValueMap.class, String.class, Object.class); - private static final BodyInserter EMPTY = - (response, context) -> response.setComplete(); + private static final BodyInserter EMPTY_INSERTER = + (response, context) -> response.setComplete(); /** - * Return an empty {@code BodyInserter} that writes nothing. - * @return an empty {@code BodyInserter} + * Inserter that does not write. + * @return the inserter */ @SuppressWarnings("unchecked") public static BodyInserter empty() { - return (BodyInserter)EMPTY; + return (BodyInserter) EMPTY_INSERTER; } /** - * Return a {@code BodyInserter} that writes the given single object. - *

Note also that + * Inserter to write the given object. + *

Alternatively, consider using the {@code syncBody(Object)} shortcuts on * {@link org.springframework.web.reactive.function.client.WebClient WebClient} and - * {@link org.springframework.web.reactive.function.server.ServerResponse ServerResponse} - * each offer a {@code syncBody(Object)} shortcut for providing an Object - * as the body. - * @param body the body of the response - * @return a {@code BodyInserter} that writes a single object + * {@link org.springframework.web.reactive.function.server.ServerResponse ServerResponse}. + * @param body the body to write to the response + * @param the type of the body + * @return the inserter to write a single object */ public static BodyInserter fromObject(T body) { - return bodyInserterFor(Mono.just(body), ResolvableType.forInstance(body)); + return (message, context) -> + writeWithMessageWriters(message, context, Mono.just(body), ResolvableType.forInstance(body)); } /** - * Return a {@code BodyInserter} that writes the given {@link Publisher}. - *

Note also that + * Inserter to write the given {@link Publisher}. + *

Alternatively, consider using the {@code body} shortcuts on * {@link org.springframework.web.reactive.function.client.WebClient WebClient} and - * {@link org.springframework.web.reactive.function.server.ServerResponse ServerResponse} - * each offer {@code body} shortcut methods for providing a Publisher as the body. - * @param publisher the publisher to stream to the response body - * @param elementClass the class of elements contained in the publisher + * {@link org.springframework.web.reactive.function.server.ServerResponse ServerResponse}. + * @param publisher the publisher to write with + * @param elementClass the type of elements in the publisher * @param the type of the elements contained in the publisher - * @param

the type of the {@code Publisher} - * @return a {@code BodyInserter} that writes a {@code Publisher} + * @param

the {@code Publisher} type + * @return the inserter to write a {@code Publisher} */ public static > BodyInserter fromPublisher( P publisher, Class elementClass) { - return bodyInserterFor(publisher, ResolvableType.forClass(elementClass)); + return (message, context) -> + writeWithMessageWriters(message, context, publisher, ResolvableType.forClass(elementClass)); } /** - * Return a {@code BodyInserter} that writes the given {@link Publisher}. - *

Note also that + * Inserter to write the given {@link Publisher}. + *

Alternatively, consider using the {@code body} shortcuts on * {@link org.springframework.web.reactive.function.client.WebClient WebClient} and - * {@link org.springframework.web.reactive.function.server.ServerResponse ServerResponse} - * each offer {@code body} shortcut methods for providing a Publisher as the body. - * @param publisher the publisher to stream to the response body - * @param typeReference the type of elements contained in the publisher + * {@link org.springframework.web.reactive.function.server.ServerResponse ServerResponse}. + * @param publisher the publisher to write with + * @param typeRef the type of elements contained in the publisher * @param the type of the elements contained in the publisher - * @param

the type of the {@code Publisher} - * @return a {@code BodyInserter} that writes a {@code Publisher} + * @param

the {@code Publisher} type + * @return the inserter to write a {@code Publisher} */ public static > BodyInserter fromPublisher( - P publisher, ParameterizedTypeReference typeReference) { + P publisher, ParameterizedTypeReference typeRef) { - return bodyInserterFor(publisher, ResolvableType.forType(typeReference.getType())); + return (message, context) -> + writeWithMessageWriters(message, context, publisher, ResolvableType.forType(typeRef.getType())); } /** - * Return a {@code BodyInserter} that writes the given {@code Resource}. + * Inserter to write the given {@code Resource}. *

If the resource can be resolved to a {@linkplain Resource#getFile() file}, it will * be copied using zero-copy. * @param resource the resource to write to the output message * @param the type of the {@code Resource} - * @return a {@code BodyInserter} that writes a {@code Publisher} + * @return the inserter to write a {@code Publisher} */ public static BodyInserter fromResource(T resource) { return (outputMessage, context) -> { - Mono inputStream = Mono.just(resource); - HttpMessageWriter messageWriter = resourceHttpMessageWriter(context); - Optional serverRequest = context.serverRequest(); - if (serverRequest.isPresent() && outputMessage instanceof ServerHttpResponse) { - return messageWriter.write(inputStream, RESOURCE_TYPE, RESOURCE_TYPE, null, - serverRequest.get(), (ServerHttpResponse) outputMessage, context.hints()); - } - else { - return messageWriter.write(inputStream, RESOURCE_TYPE, null, outputMessage, context.hints()); - } + ResolvableType elementType = RESOURCE_TYPE; + HttpMessageWriter writer = findWriter(context, elementType, null); + return write(Mono.just(resource), elementType, null, outputMessage, context, writer); }; } - private static HttpMessageWriter resourceHttpMessageWriter(BodyInserter.Context context) { - return context.messageWriters().stream() - .filter(messageWriter -> messageWriter.canWrite(RESOURCE_TYPE, null)) - .findFirst() - .map(BodyInserters::cast) - .orElseThrow(() -> new IllegalStateException( - "Could not find HttpMessageWriter that supports Resource objects")); - } - /** - * Return a {@code BodyInserter} that writes the given {@code ServerSentEvent} publisher. - *

Note that a SSE {@code BodyInserter} can also be obtained by passing a stream of strings - * or POJOs (to be encoded as JSON) to {@link #fromPublisher(Publisher, Class)}, and specifying a - * {@link MediaType#TEXT_EVENT_STREAM text/event-stream} Content-Type. + * Inserter to write the given {@code ServerSentEvent} publisher. + *

Alternatively, you can provide event data objects via + * {@link #fromPublisher(Publisher, Class)}, and set the "Content-Type" to + * {@link MediaType#TEXT_EVENT_STREAM text/event-stream}. * @param eventsPublisher the {@code ServerSentEvent} publisher to write to the response body - * @param the type of the elements contained in the {@link ServerSentEvent} - * @return a {@code BodyInserter} that writes a {@code ServerSentEvent} publisher + * @param the type of the data elements in the {@link ServerSentEvent} + * @return the inserter to write a {@code ServerSentEvent} publisher * @see Server-Sent Events W3C recommendation */ - // Note that the returned BodyInserter is parameterized to ServerHttpResponse, not - // ReactiveHttpOutputMessage like other methods, since sending SSEs only typically happens on - // the server-side + // Parameterized for server-side use public static >> BodyInserter fromServerSentEvents( S eventsPublisher) { return (serverResponse, context) -> { - HttpMessageWriter> messageWriter = - findMessageWriter(context, SERVER_SIDE_EVENT_TYPE, MediaType.TEXT_EVENT_STREAM); - return context.serverRequest() - .map(serverRequest -> messageWriter.write(eventsPublisher, SERVER_SIDE_EVENT_TYPE, - SERVER_SIDE_EVENT_TYPE, MediaType.TEXT_EVENT_STREAM, serverRequest, - serverResponse, context.hints())) - .orElseGet(() -> messageWriter.write(eventsPublisher, SERVER_SIDE_EVENT_TYPE, - MediaType.TEXT_EVENT_STREAM, serverResponse, context.hints())); + ResolvableType elmentType = SSE_TYPE; + MediaType mediaType = MediaType.TEXT_EVENT_STREAM; + HttpMessageWriter> writer = findWriter(context, elmentType, mediaType); + return write(eventsPublisher, elmentType, mediaType, serverResponse, context, writer); }; } /** - * Return a {@link FormInserter} that writes the given {@code MultiValueMap} + * Return a {@link FormInserter} to write the given {@code MultiValueMap} * as URL-encoded form data. The returned inserter allows for additional * entries to be added via {@link FormInserter#with(String, Object)}. * @@ -204,7 +179,7 @@ public abstract class BodyInserters { } /** - * Return a {@link FormInserter} that writes the given key-value pair as + * Return a {@link FormInserter} to write the given key-value pair as * URL-encoded form data. The returned inserter allows for additional * entries to be added via {@link FormInserter#with(String, Object)}. * @param name the key to add to the form @@ -218,7 +193,7 @@ public abstract class BodyInserters { } /** - * Return a {@link MultipartInserter} that writes the given + * Return a {@link MultipartInserter} to write the given * {@code MultiValueMap} as multipart data. Values in the map can be an * Object or an {@link HttpEntity}. *

Note that you can also build the multipart data externally with @@ -234,7 +209,7 @@ public abstract class BodyInserters { } /** - * Return a {@link MultipartInserter} that writes the given parts, + * Return a {@link MultipartInserter} to write the given parts, * as multipart data. Values in the map can be an Object or an * {@link HttpEntity}. *

Note that you can also build the multipart data externally with @@ -251,7 +226,7 @@ public abstract class BodyInserters { } /** - * Return a {@link MultipartInserter} that writes the given asynchronous parts, + * Return a {@link MultipartInserter} to write the given asynchronous parts, * as multipart data. *

Note that you can also build the multipart data externally with * {@link MultipartBodyBuilder}, and pass the resulting map directly to the @@ -286,11 +261,11 @@ public abstract class BodyInserters { } /** - * Return a {@code BodyInserter} that writes the given + * Inserter to write the given * {@code Publisher} to the body. * @param publisher the data buffer publisher to write * @param the type of the publisher - * @return a {@code BodyInserter} that writes directly to the body + * @return the inserter to write directly to the body * @see ReactiveHttpOutputMessage#writeWith(Publisher) */ public static > BodyInserter fromDataBuffers( @@ -301,47 +276,49 @@ public abstract class BodyInserters { } - private static , M extends ReactiveHttpOutputMessage> BodyInserter bodyInserterFor( - P body, ResolvableType bodyType) { + private static

, M extends ReactiveHttpOutputMessage> Mono writeWithMessageWriters( + M outputMessage, BodyInserter.Context context, P body, ResolvableType bodyType) { - return (outputMessage, context) -> { - MediaType contentType = outputMessage.getHeaders().getContentType(); - List> messageWriters = context.messageWriters(); - return messageWriters.stream() - .filter(messageWriter -> messageWriter.canWrite(bodyType, contentType)) - .findFirst() - .map(BodyInserters::cast) - .map(messageWriter -> { - Optional serverRequest = context.serverRequest(); - if (serverRequest.isPresent() && outputMessage instanceof ServerHttpResponse) { - return messageWriter.write(body, bodyType, bodyType, contentType, - serverRequest.get(), (ServerHttpResponse) outputMessage, - context.hints()); - } - else { - return messageWriter.write(body, bodyType, contentType, outputMessage, context.hints()); - } - }) - .orElseGet(() -> { - List supportedMediaTypes = messageWriters.stream() - .flatMap(reader -> reader.getWritableMediaTypes().stream()) - .collect(Collectors.toList()); - UnsupportedMediaTypeException error = - new UnsupportedMediaTypeException(contentType, supportedMediaTypes, bodyType); - return Mono.error(error); - }); - }; + MediaType mediaType = outputMessage.getHeaders().getContentType(); + return context.messageWriters().stream() + .filter(messageWriter -> messageWriter.canWrite(bodyType, mediaType)) + .findFirst() + .map(BodyInserters::cast) + .map(writer -> write(body, bodyType, mediaType, outputMessage, context, writer)) + .orElseGet(() -> Mono.error(unsupportedError(bodyType, context, mediaType))); } - private static HttpMessageWriter findMessageWriter( - BodyInserter.Context context, ResolvableType type, MediaType mediaType) { + private static UnsupportedMediaTypeException unsupportedError(ResolvableType bodyType, + BodyInserter.Context context, @Nullable MediaType mediaType) { + + List supportedMediaTypes = context.messageWriters().stream() + .flatMap(reader -> reader.getWritableMediaTypes().stream()) + .collect(Collectors.toList()); + + return new UnsupportedMediaTypeException(mediaType, supportedMediaTypes, bodyType); + } + + private static Mono write(Publisher input, ResolvableType type, + @Nullable MediaType mediaType, ReactiveHttpOutputMessage message, + BodyInserter.Context context, HttpMessageWriter writer) { + + return context.serverRequest() + .map(request -> { + ServerHttpResponse response = (ServerHttpResponse) message; + return writer.write(input, type, type, mediaType, request, response, context.hints()); + }) + .orElseGet(() -> writer.write(input, type, mediaType, message, context.hints())); + } + + private static HttpMessageWriter findWriter( + BodyInserter.Context context, ResolvableType elementType, @Nullable MediaType mediaType) { return context.messageWriters().stream() - .filter(messageWriter -> messageWriter.canWrite(type, mediaType)) + .filter(messageWriter -> messageWriter.canWrite(elementType, mediaType)) .findFirst() .map(BodyInserters::cast) .orElseThrow(() -> new IllegalStateException( - "Could not find HttpMessageWriter that supports " + mediaType)); + "No HttpMessageWriter for \"" + mediaType + "\" and \"" + elementType + "\"")); } @SuppressWarnings("unchecked") @@ -425,8 +402,8 @@ public abstract class BodyInserters { @Override public Mono insert(ClientHttpRequest outputMessage, Context context) { HttpMessageWriter> messageWriter = - findMessageWriter(context, FORM_TYPE, MediaType.APPLICATION_FORM_URLENCODED); - return messageWriter.write(Mono.just(this.data), FORM_TYPE, + findWriter(context, FORM_DATA_TYPE, MediaType.APPLICATION_FORM_URLENCODED); + return messageWriter.write(Mono.just(this.data), FORM_DATA_TYPE, MediaType.APPLICATION_FORM_URLENCODED, outputMessage, context.hints()); } @@ -477,9 +454,9 @@ public abstract class BodyInserters { @Override public Mono insert(ClientHttpRequest outputMessage, Context context) { HttpMessageWriter>> messageWriter = - findMessageWriter(context, MULTIPART_VALUE_TYPE, MediaType.MULTIPART_FORM_DATA); + findWriter(context, MULTIPART_DATA_TYPE, MediaType.MULTIPART_FORM_DATA); MultiValueMap> body = this.builder.build(); - return messageWriter.write(Mono.just(body), MULTIPART_VALUE_TYPE, + return messageWriter.write(Mono.just(body), MULTIPART_DATA_TYPE, MediaType.MULTIPART_FORM_DATA, outputMessage, context.hints()); }