Document PartEvent API

Closes #29170
This commit is contained in:
Arjen Poutsma 2022-10-10 16:34:56 +02:00
parent cf2b1020f4
commit dedcb19f44
4 changed files with 186 additions and 27 deletions

View File

@ -35,9 +35,9 @@ import org.springframework.util.Assert;
* *
* Each part in a multipart HTTP message produces at least one * Each part in a multipart HTTP message produces at least one
* {@code PartEvent} containing both {@link #headers() headers} and a * {@code PartEvent} containing both {@link #headers() headers} and a
* {@linkplain PartEvent#content() buffer} with content of the part. * {@linkplain PartEvent#content() buffer} with the contents of the part.
* <ul> * <ul>
* <li>Form field will produce a <em>single</em> {@link FormPartEvent}, * <li>Form fields will produce a <em>single</em> {@link FormPartEvent},
* containing the {@linkplain FormPartEvent#value() value} of the field.</li> * containing the {@linkplain FormPartEvent#value() value} of the field.</li>
* <li>File uploads will produce <em>one or more</em> {@link FilePartEvent}s, * <li>File uploads will produce <em>one or more</em> {@link FilePartEvent}s,
* containing the {@linkplain FilePartEvent#filename() filename} used when * containing the {@linkplain FilePartEvent#filename() filename} used when
@ -65,12 +65,12 @@ import org.springframework.util.Assert;
* // handle form field * // handle form field
* } * }
* else if (event instanceof FilePartEvent fileEvent) { * else if (event instanceof FilePartEvent fileEvent) {
* String filename filename = fileEvent.filename(); * String filename = fileEvent.filename();
* Flux&lt;DataBuffer&gt; contents = partEvents.map(PartEvent::content); * Flux&lt;DataBuffer&gt; contents = partEvents.map(PartEvent::content);
* // handle file upload * // handle file upload
* } * }
* else { * else {
* return Mono.error("Unexpected event: " + event); * return Mono.error(new RuntimeException("Unexpected event: " + event));
* } * }
* } * }
* else { * else {
@ -103,7 +103,7 @@ import org.springframework.util.Assert;
* .post() * .post()
* .uri("https://example.com") * .uri("https://example.com")
* .body(Flux.concat( * .body(Flux.concat(
* FormEventPart.create("field", "field value"), * FormPartEvent.create("field", "field value"),
* FilePartEvent.create("file", resource) * FilePartEvent.create("file", resource)
* ), PartEvent.class) * ), PartEvent.class)
* .retrieve() * .retrieve()

View File

@ -40,7 +40,7 @@ as the following example shows:
PersonRepository repository = ... PersonRepository repository = ...
PersonHandler handler = new PersonHandler(repository); PersonHandler handler = new PersonHandler(repository);
RouterFunction<ServerResponse> route = route() RouterFunction<ServerResponse> route = route() <1>
.GET("/person/{id}", accept(APPLICATION_JSON), handler::getPerson) .GET("/person/{id}", accept(APPLICATION_JSON), handler::getPerson)
.GET("/person", accept(APPLICATION_JSON), handler::listPeople) .GET("/person", accept(APPLICATION_JSON), handler::listPeople)
.POST("/person", handler::createPerson) .POST("/person", handler::createPerson)
@ -64,6 +64,7 @@ as the following example shows:
} }
} }
---- ----
<1> Create router using `route()`.
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] [source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
.Kotlin .Kotlin
@ -202,21 +203,62 @@ Mono<MultiValueMap<String, Part>> map = request.multipartData();
val map = request.awaitMultipartData() val map = request.awaitMultipartData()
---- ----
The following example shows how to access multiparts, one at a time, in streaming fashion: The following example shows how to access multipart data, one at a time, in streaming fashion:
[source,java,role="primary"] [source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java .Java
---- ----
Flux<Part> parts = request.body(BodyExtractors.toParts()); Flux<PartEvent> allPartEvents = request.bodyToFlux(PartEvent.class);
allPartsEvents.windowUntil(PartEvent::isLast)
.concatMap(p -> p.switchOnFirst((signal, partEvents) -> {
if (signal.hasValue()) {
PartEvent event = signal.get();
if (event instanceof FormPartEvent formEvent) {
String value = formEvent.value();
// handle form field
}
else if (event instanceof FilePartEvent fileEvent) {
String filename = fileEvent.filename();
Flux<DataBuffer> contents = partEvents.map(PartEvent::content);
// handle file upload
}
else {
return Mono.error(new RuntimeException("Unexpected event: " + event));
}
}
else {
return partEvents; // either complete or error signal
}
}));
---- ----
[source,kotlin,role="secondary"]
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
.Kotlin .Kotlin
---- ----
val parts = request.body(BodyExtractors.toParts()).asFlow() val parts = request.bodyToFlux<PartEvent>()
allPartsEvents.windowUntil(PartEvent::isLast)
.concatMap {
it.switchOnFirst { signal, partEvents ->
if (signal.hasValue()) {
val event = signal.get()
if (event is FormPartEvent) {
val value: String = event.value();
// handle form field
} else if (event is FilePartEvent) {
val filename: String = event.filename();
val contents: Flux<DataBuffer> = partEvents.map(PartEvent::content);
// handle file upload
} else {
return Mono.error(RuntimeException("Unexpected event: " + event));
}
} else {
return partEvents; // either complete or error signal
}
}
}
}
---- ----
[[webflux-fn-response]] [[webflux-fn-response]]
=== ServerResponse === ServerResponse

View File

@ -859,6 +859,54 @@ inline-style, through the built-in `BodyInserters`, as the following example sho
.awaitBody<Unit>() .awaitBody<Unit>()
---- ----
==== `PartEvent`
To stream multipart data sequentially, you can provide multipart content through `PartEvent`
objects.
- Form fields can be created via `FormPartEvent::create`.
- File uploads can be created via `FilePartEvent::create`.
You can concatenate the streams returned from methods via `Flux::concat`, and create a request for
the `WebClient`.
For instance, this sample will POST a multipart form containing a form field and a file.
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
Resource resource = ...
Mono<String> result = webClient
.post()
.uri("https://example.com")
.body(Flux.concat(
FormPartEvent.create("field", "field value"),
FilePartEvent.create("file", resource)
), PartEvent.class)
.retrieve()
.bodyToMono(String.class);
----
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
.Kotlin
----
var resource: Resource = ...
var result: Mono<String> = webClient
.post()
.uri("https://example.com")
.body(
Flux.concat(
FormPartEvent.create("field", "field value"),
FilePartEvent.create("file", resource)
)
)
.retrieve()
.bodyToMono()
----
On the server side, `PartEvent` objects that are received via `@RequestBody` or
`ServerRequest::bodyToFlux(PartEvent.class)` can be relayed to another service
via the `WebClient`.
[[webflux-client-filter]] [[webflux-client-filter]]

View File

@ -619,11 +619,11 @@ https://github.com/synchronoss/nio-multipart[Synchronoss NIO Multipart] library.
Both are configured through the `ServerCodecConfigurer` bean Both are configured through the `ServerCodecConfigurer` bean
(see the <<webflux-web-handler-api, Web Handler API>>). (see the <<webflux-web-handler-api, Web Handler API>>).
To parse multipart data in streaming fashion, you can use the `Flux<Part>` returned from an To parse multipart data in streaming fashion, you can use the `Flux<PartEvent>` returned from the
`HttpMessageReader<Part>` instead. For example, in an annotated controller, use of `PartEventHttpMessageReader` instead of using `@RequestPart`, as that implies `Map`-like access
`@RequestPart` implies `Map`-like access to individual parts by name and, hence, requires to individual parts by name and, hence, requires parsing multipart data in full.
parsing multipart data in full. By contrast, you can use `@RequestBody` to decode the By contrast, you can use `@RequestBody` to decode the content to `Flux<PartEvent>` without
content to `Flux<Part>` without collecting to a `MultiValueMap`. collecting to a `MultiValueMap`.
[[webflux-forwarded-headers]] [[webflux-forwarded-headers]]
@ -2825,29 +2825,98 @@ as the following example shows:
---- ----
<1> Using `@RequestBody`. <1> Using `@RequestBody`.
===== `PartEvent`
To access multipart data sequentially, in streaming fashion, you can use `@RequestBody` with To access multipart data sequentially, in a streaming fashion, you can use `@RequestBody` with
`Flux<Part>` (or `Flow<Part>` in Kotlin) instead, as the following example shows: `Flux<PartEvent>` (or `Flow<PartEvent>` in Kotlin).
Each part in a multipart HTTP message will produce at
least one `PartEvent` containing both headers and a buffer with the contents of the part.
- Form fields will produce a *single* `FormPartEvent`, containing the value of the field.
- File uploads will produce *one or more* `FilePartEvent` objects, containing the filename used
when uploading. If the file is large enough to be split across multiple buffers, the first
`FilePartEvent` will be followed by subsequent events.
For example:
[source,java,indent=0,subs="verbatim,quotes",role="primary"] [source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java .Java
---- ----
@PostMapping("/") @PostMapping("/")
public String handle(@RequestBody Flux<Part> parts) { <1> public void handle(@RequestBody Flux<PartEvent> allPartsEvents) { <1>
// ... allPartsEvents.windowUntil(PartEvent::isLast) <2>
.concatMap(p -> p.switchOnFirst((signal, partEvents) -> { <3>
if (signal.hasValue()) {
PartEvent event = signal.get();
if (event instanceof FormPartEvent formEvent) { <4>
String value = formEvent.value();
// handle form field
}
else if (event instanceof FilePartEvent fileEvent) { <5>
String filename = fileEvent.filename();
Flux<DataBuffer> contents = partEvents.map(PartEvent::content);
// handle file upload
}
else {
return Mono.error(new RuntimeException("Unexpected event: " + event));
}
}
else {
return partEvents; // either complete or error signal
}
}));
} }
---- ----
<1> Using `@RequestBody`. <1> Using `@RequestBody`.
<2> The final `PartEvent` for a particular part will have `isLast()` set to `true`, and can be
followed by additional events belonging to subsequent parts.
This makes the `isLast` property suitable as a predicate for the `Flux::windowUntil` operator, to
split events from all parts into windows that each belong to a single part.
<3> The `Flux::switchOnFirst` operator allows you to see whether you are handling a form field or
file upload.
<4> Handling the form field.
<5> Handling the file upload.
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] [source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
.Kotlin .Kotlin
---- ----
@PostMapping("/") @PostMapping("/")
fun handle(@RequestBody parts: Flow<Part>): String { // <1> fun handle(@RequestBody allPartsEvents: Flux<PartEvent>) = { // <1>
// ... allPartsEvents.windowUntil(PartEvent::isLast) <2>
.concatMap {
it.switchOnFirst { signal, partEvents -> <3>
if (signal.hasValue()) {
val event = signal.get()
if (event is FormPartEvent) { <4>
val value: String = event.value();
// handle form field
} else if (event is FilePartEvent) { <5>
val filename: String = event.filename();
val contents: Flux<DataBuffer> = partEvents.map(PartEvent::content);
// handle file upload
} else {
return Mono.error(RuntimeException("Unexpected event: " + event));
} }
} else {
return partEvents; // either complete or error signal
}
}
}
}
---- ----
<1> Using `@RequestBody`. <1> Using `@RequestBody`.
<2> The final `PartEvent` for a particular part will have `isLast()` set to `true`, and can be
followed by additional events belonging to subsequent parts.
This makes the `isLast` property suitable as a predicate for the `Flux::windowUntil` operator, to
split events from all parts into windows that each belong to a single part.
<3> The `Flux::switchOnFirst` operator allows you to see whether you are handling a form field or
file upload.
<4> Handling the form field.
<5> Handling the file upload.
Received part events can also be relayed to another service by using the `WebClient`.
See <<webflux-client-body-multipart>>.
[[webflux-ann-requestbody]] [[webflux-ann-requestbody]]