Add reading reactive multipart request support

This commit introduces reactive multipart support by adding a new
MultipartHttpMessageReader interface (with default methods) and a
SynchronossMultipartHttpMessageReader implementation based on
the Synchronoss NIO Multipart implementation
(https://github.com/synchronoss/nio-multipart).

Issue: SPR-14546
This commit is contained in:
Sebastien Deleuze 2017-04-27 11:45:25 +02:00
parent 7f1fa225fe
commit 23e4dd6d3d
7 changed files with 601 additions and 0 deletions

View File

@ -747,6 +747,7 @@ project("spring-web") {
optional("javax.xml.bind:jaxb-api:${jaxbVersion}")
optional("javax.xml.ws:jaxws-api:${jaxwsVersion}")
optional("javax.mail:javax.mail-api:${javamailVersion}")
optional("org.synchronoss.cloud:nio-multipart-parser:1.0.2")
optional("org.jetbrains.kotlin:kotlin-stdlib-jre8:${kotlinVersion}")
testCompile(project(":spring-context-support")) // for JafMediaTypeFactory
testCompile("io.projectreactor.addons:reactor-test")

View File

@ -0,0 +1,58 @@
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.codec.multipart;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import reactor.core.publisher.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.util.MultiValueMap;
/**
* Interface for reading multipart HTML forms with {@code "multipart/form-data"} media
* type in accordance with <a href="https://tools.ietf.org/html/rfc7578">RFC 7578</a>.
*
* @author Sebastien Deleuze
* @since 5.0
*/
public interface MultipartHttpMessageReader extends HttpMessageReader<MultiValueMap<String, Part>> {
ResolvableType MULTIPART_VALUE_TYPE =
ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, Part.class);
@Override
default List<MediaType> getReadableMediaTypes() {
return Collections.singletonList(MediaType.MULTIPART_FORM_DATA);
}
@Override
default boolean canRead(ResolvableType elementType, MediaType mediaType) {
return MULTIPART_VALUE_TYPE.isAssignableFrom(elementType) &&
(mediaType == null || MediaType.MULTIPART_FORM_DATA.isCompatibleWith(mediaType));
}
@Override
default Flux<MultiValueMap<String, Part>> read(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
return Flux.from(readMono(elementType, message, hints));
}
}

View File

@ -0,0 +1,71 @@
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.codec.multipart;
import java.io.File;
import java.util.Optional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
/**
* A representation of a part received in a multipart request. Could contain a file, the
* string or json value of a parameter.
*
* @author Sebastien Deleuze
* @since 5.0
*/
public interface Part {
/**
* @return the headers of this part
*/
HttpHeaders getHeaders();
/**
* @return the name of the parameter in the multipart form
*/
String getName();
/**
* @return optionally the filename if the part contains a file
*/
Optional<String> getFilename();
/**
* @return the content of the part as a String using the charset specified in the
* {@code Content-Type} header if any, or else using {@code UTF-8} by default.
*/
Mono<String> getContentAsString();
/**
* @return the content of the part as a stream of bytes
*/
Flux<DataBuffer> getContent();
/**
* Transfer the file contained in this part to the specified destination.
* @param dest the destination file
* @return a {@link Mono} that indicates completion of the file transfer or an error,
* for example an {@link IllegalStateException} if the part does not contain a file
*/
Mono<Void> transferTo(File dest);
}

View File

@ -0,0 +1,277 @@
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.codec.multipart;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.synchronoss.cloud.nio.multipart.Multipart;
import org.synchronoss.cloud.nio.multipart.MultipartContext;
import org.synchronoss.cloud.nio.multipart.MultipartUtils;
import org.synchronoss.cloud.nio.multipart.NioMultipartParser;
import org.synchronoss.cloud.nio.multipart.NioMultipartParserListener;
import org.synchronoss.cloud.nio.stream.storage.StreamStorage;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StreamUtils;
/**
* Implementation of {@link HttpMessageReader} to read multipart HTML
* forms with {@code "multipart/form-data"} media type in accordance
* with <a href="https://tools.ietf.org/html/rfc7578">RFC 7578</a> based
* on the Synchronoss NIO Multipart library.
*
* @author Sebastien Deleuze
* @since 5.0
* @see <a href="https://github.com/synchronoss/nio-multipart">Synchronoss NIO Multipart</a>
*/
public class SynchronossMultipartHttpMessageReader implements MultipartHttpMessageReader {
@Override
public Mono<MultiValueMap<String, Part>> readMono(ResolvableType elementType, ReactiveHttpInputMessage inputMessage, Map<String, Object> hints) {
return Flux.create(new NioMultipartConsumer(inputMessage))
.collectMultimap(part -> part.getName())
.map(partsMap -> new LinkedMultiValueMap<>(partsMap
.entrySet()
.stream()
.collect(Collectors.toMap(
entry -> entry.getKey(),
entry -> new ArrayList<>(entry.getValue()))
)));
}
private static class NioMultipartConsumer implements Consumer<FluxSink<Part>> {
private final ReactiveHttpInputMessage inputMessage;
public NioMultipartConsumer(ReactiveHttpInputMessage inputMessage) {
this.inputMessage = inputMessage;
}
@Override
public void accept(FluxSink<Part> emitter) {
HttpHeaders headers = inputMessage.getHeaders();
MultipartContext context = new MultipartContext(
headers.getContentType().toString(),
Math.toIntExact(headers.getContentLength()),
headers.getFirst(HttpHeaders.ACCEPT_CHARSET));
NioMultipartParserListener listener = new ReactiveNioMultipartParserListener(emitter);
NioMultipartParser parser = Multipart.multipart(context).forNIO(listener);
inputMessage.getBody().subscribe(buffer -> {
byte[] resultBytes = new byte[buffer.readableByteCount()];
buffer.read(resultBytes);
try {
parser.write(resultBytes);
}
catch (IOException ex) {
listener.onError("Exception thrown while closing the parser", ex);
}
}, (e) -> {
try {
listener.onError("Exception thrown while reading the request body", e);
parser.close();
}
catch (IOException ex) {
listener.onError("Exception thrown while closing the parser", ex);
}
}, () -> {
try {
parser.close();
}
catch (IOException ex) {
listener.onError("Exception thrown while closing the parser", ex);
}
});
}
private static class ReactiveNioMultipartParserListener implements NioMultipartParserListener {
private FluxSink<Part> emitter;
private final AtomicInteger errorCount = new AtomicInteger(0);
public ReactiveNioMultipartParserListener(FluxSink<Part> emitter) {
this.emitter = emitter;
}
@Override
public void onPartFinished(StreamStorage streamStorage, Map<String, List<String>> headersFromPart) {
HttpHeaders headers = new HttpHeaders();
headers.putAll(headersFromPart);
emitter.next(new NioPart(headers, streamStorage));
}
@Override
public void onFormFieldPartFinished(String fieldName, String fieldValue, Map<String, List<String>> headersFromPart) {
HttpHeaders headers = new HttpHeaders();
headers.putAll(headersFromPart);
emitter.next(new NioPart(headers, fieldValue));
}
@Override
public void onAllPartsFinished() {
emitter.complete();
}
@Override
public void onNestedPartStarted(Map<String, List<String>> headersFromParentPart) {
}
@Override
public void onNestedPartFinished() {
}
@Override
public void onError(String message, Throwable cause) {
if (errorCount.getAndIncrement() == 1) {
emitter.error(new RuntimeException(message, cause));
}
}
}
}
/**
* {@link Part} implementation based on the NIO Multipart library.
*/
private static class NioPart implements Part {
private final HttpHeaders headers;
private final StreamStorage streamStorage;
private final String content;
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
public NioPart(HttpHeaders headers, StreamStorage streamStorage) {
this.headers = headers;
this.streamStorage = streamStorage;
this.content = null;
}
public NioPart(HttpHeaders headers, String content) {
this.headers = headers;
this.streamStorage = null;
this.content = content;
}
@Override
public String getName() {
return MultipartUtils.getFieldName(headers);
}
@Override
public HttpHeaders getHeaders() {
return this.headers;
}
@Override
public Optional<String> getFilename() {
return Optional.ofNullable(MultipartUtils.getFileName(this.headers));
}
@Override
public Mono<Void> transferTo(File dest) {
if (!getFilename().isPresent()) {
return Mono.error(new IllegalStateException("The part does not contain a file."));
}
try {
InputStream inputStream = this.streamStorage.getInputStream();
// Get a FileChannel when possible in order to use zero copy mechanism
ReadableByteChannel inChannel = Channels.newChannel(inputStream);
FileChannel outChannel = new FileOutputStream(dest).getChannel();
// NIO Multipart has previously limited the size of the content
long count = (inChannel instanceof FileChannel ? ((FileChannel)inChannel).size() : Long.MAX_VALUE);
long result = outChannel.transferFrom(inChannel, 0, count);
if (result < count) {
return Mono.error(new IOException(
"Could only write " + result + " out of " + count + " bytes"));
}
}
catch (IOException ex) {
return Mono.error(ex);
}
return Mono.empty();
}
@Override
public Mono<String> getContentAsString() {
if (this.content != null) {
return Mono.just(this.content);
}
MediaType contentType = this.headers.getContentType();
Charset charset = (contentType.getCharset() == null ? StandardCharsets.UTF_8 : contentType.getCharset());
try {
return Mono.just(StreamUtils.copyToString(this.streamStorage.getInputStream(), charset));
}
catch (IOException e) {
return Mono.error(new IllegalStateException("Error while reading part content as a string", e));
}
}
@Override
public Flux<DataBuffer> getContent() {
if (this.content != null) {
DataBuffer buffer = this.bufferFactory.allocateBuffer(this.content.length());
buffer.write(this.content.getBytes());
return Flux.just(buffer);
}
InputStream inputStream = this.streamStorage.getInputStream();
return DataBufferUtils.read(inputStream, this.bufferFactory, 4096);
}
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.codec.multipart;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.springframework.core.ResolvableType;
import org.springframework.http.MediaType;
import org.springframework.util.MultiValueMap;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* @author Sebastien Deleuze
*/
public class MultipartHttpMessageReaderTests {
private MultipartHttpMessageReader reader;
@Before
public void setUp() throws Exception {
this.reader = (elementType, message, hints) -> {
throw new UnsupportedOperationException();
};
}
@Test
public void canRead() {
assertTrue(this.reader.canRead(
ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, Part.class),
MediaType.MULTIPART_FORM_DATA));
assertFalse(this.reader.canRead(
ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, Object.class),
MediaType.MULTIPART_FORM_DATA));
assertFalse(this.reader.canRead(
ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, String.class),
MediaType.MULTIPART_FORM_DATA));
assertFalse(this.reader.canRead(
ResolvableType.forClassWithGenerics(Map.class, String.class, String.class),
MediaType.MULTIPART_FORM_DATA));
assertFalse(this.reader.canRead(
ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, Part.class),
MediaType.APPLICATION_FORM_URLENCODED));
}
}

View File

@ -0,0 +1,123 @@
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.codec.multipart;
import java.io.IOException;
import java.util.Optional;
import static java.util.Collections.emptyMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import static org.springframework.http.HttpHeaders.CONTENT_LENGTH;
import static org.springframework.http.HttpHeaders.CONTENT_TYPE;
import static org.springframework.http.MediaType.MULTIPART_FORM_DATA;
import static org.springframework.http.codec.multipart.MultipartHttpMessageReader.*;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.MockHttpOutputMessage;
import org.springframework.http.converter.FormHttpMessageConverter;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.mock.http.server.reactive.test.MockServerHttpRequest;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* @author Sebastien Deleuze
*/
public class SynchronossMultipartHttpMessageReaderTests {
@Test
public void resolveParts() throws IOException {
ServerHttpRequest request = generateMultipartRequest();
MultipartHttpMessageReader multipartReader = new SynchronossMultipartHttpMessageReader();
MultiValueMap<String, Part> parts = multipartReader.readMono(MULTIPART_VALUE_TYPE, request, emptyMap()).block();
assertEquals(2, parts.size());
assertTrue(parts.containsKey("fooPart"));
Part part = parts.getFirst("fooPart");
assertEquals("fooPart", part.getName());
Optional<String> filename = part.getFilename();
assertTrue(filename.isPresent());
assertEquals("foo.txt", filename.get());
DataBuffer buffer = part
.getContent()
.reduce((s1, s2) -> s1.write(s2))
.block();
assertEquals(12, buffer.readableByteCount());
byte[] byteContent = new byte[12];
buffer.read(byteContent);
assertEquals("Lorem\nIpsum\n", new String(byteContent));
assertTrue(parts.containsKey("barPart"));
part = parts.getFirst("barPart");
assertEquals("barPart", part.getName());
filename = part.getFilename();
assertFalse(filename.isPresent());
assertEquals("bar", part.getContentAsString().block());
}
@Test
public void bodyError() {
ServerHttpRequest request = generateErrorMultipartRequest();
MultipartHttpMessageReader multipartReader = new SynchronossMultipartHttpMessageReader();
StepVerifier.create(multipartReader.readMono(MULTIPART_VALUE_TYPE, request, emptyMap()))
.verifyError();
}
private ServerHttpRequest generateMultipartRequest() throws IOException {
HttpHeaders fooHeaders = new HttpHeaders();
fooHeaders.setContentType(MediaType.TEXT_PLAIN);
ClassPathResource fooResource = new ClassPathResource("org/springframework/http/codec/multipart/foo.txt");
HttpEntity<ClassPathResource> fooPart = new HttpEntity<>(fooResource, fooHeaders);
HttpEntity<String> barPart = new HttpEntity<>("bar");
FormHttpMessageConverter converter = new FormHttpMessageConverter();
MockHttpOutputMessage outputMessage = new MockHttpOutputMessage();
MultiValueMap<String, Object> parts = new LinkedMultiValueMap<>();
parts.add("fooPart", fooPart);
parts.add("barPart", barPart);
converter.write(parts, MULTIPART_FORM_DATA, outputMessage);
byte[] content = outputMessage.getBodyAsBytes();
MockServerHttpRequest request = MockServerHttpRequest
.post("/foo")
.header(CONTENT_TYPE, outputMessage.getHeaders().getContentType().toString())
.header(CONTENT_LENGTH, String.valueOf(content.length))
.body(new String(content));
return request;
}
private ServerHttpRequest generateErrorMultipartRequest() {
DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
MockServerHttpRequest request = MockServerHttpRequest
.post("/foo")
.header(CONTENT_TYPE, MULTIPART_FORM_DATA.toString())
.body(Flux.just(bufferFactory.wrap("invalid content".getBytes())));
return request;
}
}

View File

@ -0,0 +1,2 @@
Lorem
Ipsum