Add Flux<Part> ServerRequest.parts()

This commit introduces Flux<Part> ServerRequest.parts() that delegates
to ServerWebExchange.getParts() and offers an alternative,
streaming way of accessing multipart data.

Closes gh-23131
This commit is contained in:
Arjen Poutsma 2019-06-14 10:33:42 +02:00
parent 11c7907a59
commit 92981ac9de
8 changed files with 133 additions and 25 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -249,6 +249,13 @@ public final class MockServerRequest implements ServerRequest {
return (Mono<MultiValueMap<String, Part>>) this.body;
}
@Override
@SuppressWarnings("unchecked")
public Flux<Part> parts() {
Assert.state(this.body != null, "No body");
return (Flux<Part>) this.body;
}
@Override
public ServerWebExchange exchange() {
Assert.state(this.exchange != null, "No exchange");

View File

@ -220,6 +220,11 @@ class DefaultServerRequest implements ServerRequest {
return this.exchange.getMultipartData();
}
@Override
public Flux<Part> parts() {
return this.exchange.getParts();
}
private ServerHttpRequest request() {
return this.exchange.getRequest();
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -290,8 +290,7 @@ class DefaultServerRequestBuilder implements ServerRequest.Builder {
private static final ResolvableType FORM_DATA_TYPE =
ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, String.class);
private static final ResolvableType MULTIPART_DATA_TYPE = ResolvableType.forClassWithGenerics(
MultiValueMap.class, String.class, Part.class);
private static final ResolvableType PARTS_DATA_TYPE = ResolvableType.forClass(Part.class);
private static final Mono<MultiValueMap<String, String>> EMPTY_FORM_DATA =
Mono.just(CollectionUtils.unmodifiableMultiValueMap(new LinkedMultiValueMap<String, String>(0))).cache();
@ -307,13 +306,16 @@ class DefaultServerRequestBuilder implements ServerRequest.Builder {
private final Mono<MultiValueMap<String, Part>> multipartDataMono;
private final Flux<Part> parts;
public DelegatingServerWebExchange(
ServerHttpRequest request, ServerWebExchange delegate, List<HttpMessageReader<?>> messageReaders) {
this.request = request;
this.delegate = delegate;
this.formDataMono = initFormData(request, messageReaders);
this.multipartDataMono = initMultipartData(request, messageReaders);
this.parts = initParts(request, messageReaders);
this.multipartDataMono = initMultipartData(this.parts);
}
@SuppressWarnings("unchecked")
@ -339,26 +341,32 @@ class DefaultServerRequestBuilder implements ServerRequest.Builder {
}
@SuppressWarnings("unchecked")
private static Mono<MultiValueMap<String, Part>> initMultipartData(ServerHttpRequest request,
List<HttpMessageReader<?>> readers) {
private static Flux<Part> initParts(ServerHttpRequest request, List<HttpMessageReader<?>> readers) {
try {
MediaType contentType = request.getHeaders().getContentType();
if (MediaType.MULTIPART_FORM_DATA.isCompatibleWith(contentType)) {
return ((HttpMessageReader<MultiValueMap<String, Part>>) readers.stream()
.filter(reader -> reader.canRead(MULTIPART_DATA_TYPE, MediaType.MULTIPART_FORM_DATA))
return ((HttpMessageReader<Part>)readers.stream()
.filter(reader -> reader.canRead(PARTS_DATA_TYPE, MediaType.MULTIPART_FORM_DATA))
.findFirst()
.orElseThrow(() -> new IllegalStateException("No multipart HttpMessageReader.")))
.readMono(MULTIPART_DATA_TYPE, request, Hints.none())
.switchIfEmpty(EMPTY_MULTIPART_DATA)
.cache();
.read(PARTS_DATA_TYPE, request, Hints.none());
}
}
catch (InvalidMediaTypeException ex) {
// Ignore
}
return EMPTY_MULTIPART_DATA;
return Flux.empty();
}
private static Mono<MultiValueMap<String, Part>> initMultipartData(Flux<Part> parts) {
return parts.collect(
() -> (MultiValueMap<String, Part>) new LinkedMultiValueMap<String, Part>(),
(map, part) -> map.add(part.name(), part))
.switchIfEmpty(EMPTY_MULTIPART_DATA)
.cache();
}
@Override
public ServerHttpRequest getRequest() {
return this.request;
@ -374,6 +382,11 @@ class DefaultServerRequestBuilder implements ServerRequest.Builder {
return this.multipartDataMono;
}
@Override
public Flux<Part> getParts() {
return this.parts;
}
// Delegating methods
@Override

View File

@ -1025,6 +1025,11 @@ public abstract class RequestPredicates {
return this.request.multipartData();
}
@Override
public Flux<Part> parts() {
return this.request.parts();
}
@Override
public ServerWebExchange exchange() {
return this.request.exchange();

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -270,9 +270,23 @@ public interface ServerRequest {
* <p><strong>Note:</strong> calling this method causes the request body to
* be read and parsed in full, and the resulting {@code MultiValueMap} is
* cached so that this method is safe to call more than once.
* <p><strong>Note:</strong>the {@linkplain Part#content() contents} of each
* part is not cached, and can only be read once.
*/
Mono<MultiValueMap<String, Part>> multipartData();
/**
* Get the parts of a multipart request if the Content-Type is
* {@code "multipart/form-data"} or an empty flux otherwise.
* <p><strong>Note:</strong> calling this method causes the request body to
* be read and parsed in full and the resulting {@code Flux} is
* cached so that this method is safe to call more than once.
* <p><strong>Note:</strong>the {@linkplain Part#content() contents} of each
* part is not cached, and can only be read once.
* @since 5.2
*/
Flux<Part> parts();
/**
* Get the web exchange that this request is based on.
* <p>Note: Manipulating the exchange directly (instead of using the methods provided on

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -208,6 +208,11 @@ public class ServerRequestWrapper implements ServerRequest {
return this.delegate.multipartData();
}
@Override
public Flux<Part> parts() {
return this.delegate.parts();
}
@Override
public ServerWebExchange exchange() {
return this.delegate.exchange();

View File

@ -16,6 +16,10 @@
package org.springframework.web.reactive.function;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
import org.junit.Test;
@ -29,6 +33,7 @@ import org.springframework.http.client.MultipartBodyBuilder;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.http.codec.multipart.FormFieldPart;
import org.springframework.http.codec.multipart.Part;
import org.springframework.util.FileCopyUtils;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
@ -38,7 +43,7 @@ import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.assertj.core.api.Assertions.fail;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
/**
@ -48,6 +53,8 @@ public class MultipartIntegrationTests extends AbstractRouterFunctionIntegration
private final WebClient webClient = WebClient.create();
private ClassPathResource resource = new ClassPathResource("org/springframework/http/codec/multipart/foo.txt");
@Test
public void multipartData() {
@ -77,9 +84,33 @@ public class MultipartIntegrationTests extends AbstractRouterFunctionIntegration
.verifyComplete();
}
@Test
public void transferTo() {
Mono<String> result = webClient
.post()
.uri("http://localhost:" + this.port + "/transferTo")
.syncBody(generateBody())
.retrieve()
.bodyToMono(String.class);
StepVerifier
.create(result)
.consumeNextWith(location -> {
try {
byte[] actualBytes = Files.readAllBytes(Paths.get(location));
byte[] expectedBytes = FileCopyUtils.copyToByteArray(this.resource.getInputStream());
assertThat(actualBytes).isEqualTo(expectedBytes);
}
catch (IOException ex) {
fail("IOException", ex);
}
})
.verifyComplete();
}
private MultiValueMap<String, HttpEntity<?>> generateBody() {
MultipartBodyBuilder builder = new MultipartBodyBuilder();
builder.part("fooPart", new ClassPathResource("org/springframework/http/codec/multipart/foo.txt"));
builder.part("fooPart", resource);
builder.part("barPart", "bar");
return builder.build();
}
@ -87,44 +118,65 @@ public class MultipartIntegrationTests extends AbstractRouterFunctionIntegration
@Override
protected RouterFunction<ServerResponse> routerFunction() {
MultipartHandler multipartHandler = new MultipartHandler();
return route(POST("/multipartData"), multipartHandler::multipartData)
.andRoute(POST("/parts"), multipartHandler::parts);
return route()
.POST("/multipartData", multipartHandler::multipartData)
.POST("/parts", multipartHandler::parts)
.POST("/transferTo", multipartHandler::transferTo)
.build();
}
private static class MultipartHandler {
public Mono<ServerResponse> multipartData(ServerRequest request) {
return request
.body(BodyExtractors.toMultipartData())
return request.multipartData()
.flatMap(map -> {
Map<String, Part> parts = map.toSingleValueMap();
try {
assertThat(parts.size()).isEqualTo(2);
assertThat(((FilePart) parts.get("fooPart")).filename()).isEqualTo("foo.txt");
assertThat(((FormFieldPart) parts.get("barPart")).value()).isEqualTo("bar");
return ServerResponse.ok().build();
}
catch(Exception e) {
return Mono.error(e);
}
return ServerResponse.ok().build();
});
}
public Mono<ServerResponse> parts(ServerRequest request) {
return request.body(BodyExtractors.toParts()).collectList()
return request.parts().collectList()
.flatMap(parts -> {
try {
assertThat(parts.size()).isEqualTo(2);
assertThat(((FilePart) parts.get(0)).filename()).isEqualTo("foo.txt");
assertThat(((FormFieldPart) parts.get(1)).value()).isEqualTo("bar");
return ServerResponse.ok().build();
}
catch(Exception e) {
return Mono.error(e);
}
return ServerResponse.ok().build();
});
}
public Mono<ServerResponse> transferTo(ServerRequest request) {
return request.parts()
.filter(part -> part instanceof FilePart)
.next()
.cast(FilePart.class)
.flatMap(part -> {
try {
Path tempFile = Files.createTempFile("MultipartIntegrationTests", null);
return part.transferTo(tempFile)
.then(ServerResponse.ok()
.syncBody(tempFile.toString()));
}
catch (Exception e) {
return Mono.error(e);
}
});
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -247,6 +247,13 @@ public class MockServerRequest implements ServerRequest {
return (Mono<MultiValueMap<String, Part>>) this.body;
}
@Override
@SuppressWarnings("unchecked")
public Flux<Part> parts() {
Assert.state(this.body != null, "No body");
return (Flux<Part>) this.body;
}
@Override
public ServerWebExchange exchange() {
Assert.state(this.exchange != null, "No exchange");