diff --git a/build.gradle b/build.gradle
index bdc16ee0dc..d31ee6d629 100644
--- a/build.gradle
+++ b/build.gradle
@@ -747,6 +747,7 @@ project("spring-web") {
optional("javax.xml.bind:jaxb-api:${jaxbVersion}")
optional("javax.xml.ws:jaxws-api:${jaxwsVersion}")
optional("javax.mail:javax.mail-api:${javamailVersion}")
+ optional("org.synchronoss.cloud:nio-multipart-parser:1.0.2")
optional("org.jetbrains.kotlin:kotlin-stdlib-jre8:${kotlinVersion}")
testCompile(project(":spring-context-support")) // for JafMediaTypeFactory
testCompile("io.projectreactor.addons:reactor-test")
diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartHttpMessageReader.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartHttpMessageReader.java
new file mode 100644
index 0000000000..8725bcb6fc
--- /dev/null
+++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartHttpMessageReader.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2002-2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.http.codec.multipart;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import reactor.core.publisher.Flux;
+
+import org.springframework.core.ResolvableType;
+import org.springframework.http.MediaType;
+import org.springframework.http.ReactiveHttpInputMessage;
+import org.springframework.http.codec.HttpMessageReader;
+import org.springframework.util.MultiValueMap;
+
+/**
+ * Interface for reading multipart HTML forms with {@code "multipart/form-data"} media
+ * type in accordance with RFC 7578.
+ *
+ * @author Sebastien Deleuze
+ * @since 5.0
+ */
+public interface MultipartHttpMessageReader extends HttpMessageReader> {
+
+ ResolvableType MULTIPART_VALUE_TYPE =
+ ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, Part.class);
+
+ @Override
+ default List getReadableMediaTypes() {
+ return Collections.singletonList(MediaType.MULTIPART_FORM_DATA);
+ }
+
+ @Override
+ default boolean canRead(ResolvableType elementType, MediaType mediaType) {
+ return MULTIPART_VALUE_TYPE.isAssignableFrom(elementType) &&
+ (mediaType == null || MediaType.MULTIPART_FORM_DATA.isCompatibleWith(mediaType));
+ }
+
+ @Override
+ default Flux> read(ResolvableType elementType, ReactiveHttpInputMessage message, Map hints) {
+ return Flux.from(readMono(elementType, message, hints));
+ }
+}
diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/Part.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/Part.java
new file mode 100644
index 0000000000..2c65f58557
--- /dev/null
+++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/Part.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2002-2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.http.codec.multipart;
+
+import java.io.File;
+import java.util.Optional;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.http.HttpHeaders;
+
+/**
+ * A representation of a part received in a multipart request. Could contain a file, the
+ * string or json value of a parameter.
+ *
+ * @author Sebastien Deleuze
+ * @since 5.0
+ */
+public interface Part {
+
+ /**
+ * @return the headers of this part
+ */
+ HttpHeaders getHeaders();
+
+ /**
+ * @return the name of the parameter in the multipart form
+ */
+ String getName();
+
+ /**
+ * @return optionally the filename if the part contains a file
+ */
+ Optional getFilename();
+
+ /**
+ * @return the content of the part as a String using the charset specified in the
+ * {@code Content-Type} header if any, or else using {@code UTF-8} by default.
+ */
+ Mono getContentAsString();
+
+ /**
+ * @return the content of the part as a stream of bytes
+ */
+ Flux getContent();
+
+ /**
+ * Transfer the file contained in this part to the specified destination.
+ * @param dest the destination file
+ * @return a {@link Mono} that indicates completion of the file transfer or an error,
+ * for example an {@link IllegalStateException} if the part does not contain a file
+ */
+ Mono transferTo(File dest);
+
+}
diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/SynchronossMultipartHttpMessageReader.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/SynchronossMultipartHttpMessageReader.java
new file mode 100644
index 0000000000..47aa5a76eb
--- /dev/null
+++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/SynchronossMultipartHttpMessageReader.java
@@ -0,0 +1,277 @@
+/*
+ * Copyright 2002-2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.http.codec.multipart;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.synchronoss.cloud.nio.multipart.Multipart;
+import org.synchronoss.cloud.nio.multipart.MultipartContext;
+import org.synchronoss.cloud.nio.multipart.MultipartUtils;
+import org.synchronoss.cloud.nio.multipart.NioMultipartParser;
+import org.synchronoss.cloud.nio.multipart.NioMultipartParserListener;
+import org.synchronoss.cloud.nio.stream.storage.StreamStorage;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+
+import org.springframework.core.ResolvableType;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferFactory;
+import org.springframework.core.io.buffer.DataBufferUtils;
+import org.springframework.core.io.buffer.DefaultDataBufferFactory;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.http.ReactiveHttpInputMessage;
+import org.springframework.http.codec.HttpMessageReader;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+import org.springframework.util.StreamUtils;
+
+/**
+ * Implementation of {@link HttpMessageReader} to read multipart HTML
+ * forms with {@code "multipart/form-data"} media type in accordance
+ * with RFC 7578 based
+ * on the Synchronoss NIO Multipart library.
+ *
+ * @author Sebastien Deleuze
+ * @since 5.0
+ * @see Synchronoss NIO Multipart
+ */
+public class SynchronossMultipartHttpMessageReader implements MultipartHttpMessageReader {
+
+ @Override
+ public Mono> readMono(ResolvableType elementType, ReactiveHttpInputMessage inputMessage, Map hints) {
+
+ return Flux.create(new NioMultipartConsumer(inputMessage))
+ .collectMultimap(part -> part.getName())
+ .map(partsMap -> new LinkedMultiValueMap<>(partsMap
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(
+ entry -> entry.getKey(),
+ entry -> new ArrayList<>(entry.getValue()))
+ )));
+ }
+
+
+ private static class NioMultipartConsumer implements Consumer> {
+
+ private final ReactiveHttpInputMessage inputMessage;
+
+
+ public NioMultipartConsumer(ReactiveHttpInputMessage inputMessage) {
+ this.inputMessage = inputMessage;
+ }
+
+
+ @Override
+ public void accept(FluxSink emitter) {
+ HttpHeaders headers = inputMessage.getHeaders();
+ MultipartContext context = new MultipartContext(
+ headers.getContentType().toString(),
+ Math.toIntExact(headers.getContentLength()),
+ headers.getFirst(HttpHeaders.ACCEPT_CHARSET));
+ NioMultipartParserListener listener = new ReactiveNioMultipartParserListener(emitter);
+ NioMultipartParser parser = Multipart.multipart(context).forNIO(listener);
+
+ inputMessage.getBody().subscribe(buffer -> {
+ byte[] resultBytes = new byte[buffer.readableByteCount()];
+ buffer.read(resultBytes);
+ try {
+ parser.write(resultBytes);
+ }
+ catch (IOException ex) {
+ listener.onError("Exception thrown while closing the parser", ex);
+ }
+
+ }, (e) -> {
+ try {
+ listener.onError("Exception thrown while reading the request body", e);
+ parser.close();
+ }
+ catch (IOException ex) {
+ listener.onError("Exception thrown while closing the parser", ex);
+ }
+ }, () -> {
+ try {
+ parser.close();
+ }
+ catch (IOException ex) {
+ listener.onError("Exception thrown while closing the parser", ex);
+ }
+ });
+
+ }
+
+ private static class ReactiveNioMultipartParserListener implements NioMultipartParserListener {
+
+ private FluxSink emitter;
+
+ private final AtomicInteger errorCount = new AtomicInteger(0);
+
+
+ public ReactiveNioMultipartParserListener(FluxSink emitter) {
+ this.emitter = emitter;
+ }
+
+
+ @Override
+ public void onPartFinished(StreamStorage streamStorage, Map> headersFromPart) {
+ HttpHeaders headers = new HttpHeaders();
+ headers.putAll(headersFromPart);
+ emitter.next(new NioPart(headers, streamStorage));
+ }
+
+ @Override
+ public void onFormFieldPartFinished(String fieldName, String fieldValue, Map> headersFromPart) {
+ HttpHeaders headers = new HttpHeaders();
+ headers.putAll(headersFromPart);
+ emitter.next(new NioPart(headers, fieldValue));
+ }
+
+ @Override
+ public void onAllPartsFinished() {
+ emitter.complete();
+ }
+
+ @Override
+ public void onNestedPartStarted(Map> headersFromParentPart) {
+ }
+
+ @Override
+ public void onNestedPartFinished() {
+ }
+
+ @Override
+ public void onError(String message, Throwable cause) {
+ if (errorCount.getAndIncrement() == 1) {
+ emitter.error(new RuntimeException(message, cause));
+ }
+ }
+
+ }
+ }
+
+ /**
+ * {@link Part} implementation based on the NIO Multipart library.
+ */
+ private static class NioPart implements Part {
+
+ private final HttpHeaders headers;
+
+ private final StreamStorage streamStorage;
+
+ private final String content;
+
+ private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
+
+
+ public NioPart(HttpHeaders headers, StreamStorage streamStorage) {
+ this.headers = headers;
+ this.streamStorage = streamStorage;
+ this.content = null;
+ }
+
+ public NioPart(HttpHeaders headers, String content) {
+ this.headers = headers;
+ this.streamStorage = null;
+ this.content = content;
+ }
+
+
+ @Override
+ public String getName() {
+ return MultipartUtils.getFieldName(headers);
+ }
+
+ @Override
+ public HttpHeaders getHeaders() {
+ return this.headers;
+ }
+
+ @Override
+ public Optional getFilename() {
+ return Optional.ofNullable(MultipartUtils.getFileName(this.headers));
+ }
+
+ @Override
+ public Mono transferTo(File dest) {
+ if (!getFilename().isPresent()) {
+ return Mono.error(new IllegalStateException("The part does not contain a file."));
+ }
+ try {
+ InputStream inputStream = this.streamStorage.getInputStream();
+ // Get a FileChannel when possible in order to use zero copy mechanism
+ ReadableByteChannel inChannel = Channels.newChannel(inputStream);
+ FileChannel outChannel = new FileOutputStream(dest).getChannel();
+ // NIO Multipart has previously limited the size of the content
+ long count = (inChannel instanceof FileChannel ? ((FileChannel)inChannel).size() : Long.MAX_VALUE);
+ long result = outChannel.transferFrom(inChannel, 0, count);
+ if (result < count) {
+ return Mono.error(new IOException(
+ "Could only write " + result + " out of " + count + " bytes"));
+ }
+ }
+ catch (IOException ex) {
+ return Mono.error(ex);
+ }
+ return Mono.empty();
+ }
+
+ @Override
+ public Mono getContentAsString() {
+ if (this.content != null) {
+ return Mono.just(this.content);
+ }
+ MediaType contentType = this.headers.getContentType();
+ Charset charset = (contentType.getCharset() == null ? StandardCharsets.UTF_8 : contentType.getCharset());
+ try {
+ return Mono.just(StreamUtils.copyToString(this.streamStorage.getInputStream(), charset));
+ }
+ catch (IOException e) {
+ return Mono.error(new IllegalStateException("Error while reading part content as a string", e));
+ }
+ }
+
+ @Override
+ public Flux getContent() {
+ if (this.content != null) {
+ DataBuffer buffer = this.bufferFactory.allocateBuffer(this.content.length());
+ buffer.write(this.content.getBytes());
+ return Flux.just(buffer);
+ }
+ InputStream inputStream = this.streamStorage.getInputStream();
+ return DataBufferUtils.read(inputStream, this.bufferFactory, 4096);
+ }
+
+ }
+}
diff --git a/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageReaderTests.java b/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageReaderTests.java
new file mode 100644
index 0000000000..6d312d995c
--- /dev/null
+++ b/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageReaderTests.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2002-2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.http.codec.multipart;
+
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.springframework.core.ResolvableType;
+import org.springframework.http.MediaType;
+import org.springframework.util.MultiValueMap;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * @author Sebastien Deleuze
+ */
+public class MultipartHttpMessageReaderTests {
+
+ private MultipartHttpMessageReader reader;
+
+ @Before
+ public void setUp() throws Exception {
+ this.reader = (elementType, message, hints) -> {
+ throw new UnsupportedOperationException();
+ };
+ }
+
+ @Test
+ public void canRead() {
+ assertTrue(this.reader.canRead(
+ ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, Part.class),
+ MediaType.MULTIPART_FORM_DATA));
+
+ assertFalse(this.reader.canRead(
+ ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, Object.class),
+ MediaType.MULTIPART_FORM_DATA));
+
+ assertFalse(this.reader.canRead(
+ ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, String.class),
+ MediaType.MULTIPART_FORM_DATA));
+
+ assertFalse(this.reader.canRead(
+ ResolvableType.forClassWithGenerics(Map.class, String.class, String.class),
+ MediaType.MULTIPART_FORM_DATA));
+
+ assertFalse(this.reader.canRead(
+ ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, Part.class),
+ MediaType.APPLICATION_FORM_URLENCODED));
+ }
+
+}
diff --git a/spring-web/src/test/java/org/springframework/http/codec/multipart/SynchronossMultipartHttpMessageReaderTests.java b/spring-web/src/test/java/org/springframework/http/codec/multipart/SynchronossMultipartHttpMessageReaderTests.java
new file mode 100644
index 0000000000..180ebbf807
--- /dev/null
+++ b/spring-web/src/test/java/org/springframework/http/codec/multipart/SynchronossMultipartHttpMessageReaderTests.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2002-2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.http.codec.multipart;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static java.util.Collections.emptyMap;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+import static org.springframework.http.HttpHeaders.CONTENT_LENGTH;
+import static org.springframework.http.HttpHeaders.CONTENT_TYPE;
+import static org.springframework.http.MediaType.MULTIPART_FORM_DATA;
+import static org.springframework.http.codec.multipart.MultipartHttpMessageReader.*;
+
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferFactory;
+import org.springframework.core.io.buffer.DefaultDataBufferFactory;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+
+import org.springframework.http.MediaType;
+import org.springframework.http.MockHttpOutputMessage;
+import org.springframework.http.converter.FormHttpMessageConverter;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.mock.http.server.reactive.test.MockServerHttpRequest;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+/**
+ * @author Sebastien Deleuze
+ */
+public class SynchronossMultipartHttpMessageReaderTests {
+
+ @Test
+ public void resolveParts() throws IOException {
+ ServerHttpRequest request = generateMultipartRequest();
+ MultipartHttpMessageReader multipartReader = new SynchronossMultipartHttpMessageReader();
+ MultiValueMap parts = multipartReader.readMono(MULTIPART_VALUE_TYPE, request, emptyMap()).block();
+ assertEquals(2, parts.size());
+
+ assertTrue(parts.containsKey("fooPart"));
+ Part part = parts.getFirst("fooPart");
+ assertEquals("fooPart", part.getName());
+ Optional filename = part.getFilename();
+ assertTrue(filename.isPresent());
+ assertEquals("foo.txt", filename.get());
+ DataBuffer buffer = part
+ .getContent()
+ .reduce((s1, s2) -> s1.write(s2))
+ .block();
+ assertEquals(12, buffer.readableByteCount());
+ byte[] byteContent = new byte[12];
+ buffer.read(byteContent);
+ assertEquals("Lorem\nIpsum\n", new String(byteContent));
+
+ assertTrue(parts.containsKey("barPart"));
+ part = parts.getFirst("barPart");
+ assertEquals("barPart", part.getName());
+ filename = part.getFilename();
+ assertFalse(filename.isPresent());
+ assertEquals("bar", part.getContentAsString().block());
+ }
+
+ @Test
+ public void bodyError() {
+ ServerHttpRequest request = generateErrorMultipartRequest();
+ MultipartHttpMessageReader multipartReader = new SynchronossMultipartHttpMessageReader();
+ StepVerifier.create(multipartReader.readMono(MULTIPART_VALUE_TYPE, request, emptyMap()))
+ .verifyError();
+ }
+
+ private ServerHttpRequest generateMultipartRequest() throws IOException {
+ HttpHeaders fooHeaders = new HttpHeaders();
+ fooHeaders.setContentType(MediaType.TEXT_PLAIN);
+ ClassPathResource fooResource = new ClassPathResource("org/springframework/http/codec/multipart/foo.txt");
+ HttpEntity fooPart = new HttpEntity<>(fooResource, fooHeaders);
+ HttpEntity barPart = new HttpEntity<>("bar");
+ FormHttpMessageConverter converter = new FormHttpMessageConverter();
+ MockHttpOutputMessage outputMessage = new MockHttpOutputMessage();
+ MultiValueMap parts = new LinkedMultiValueMap<>();
+ parts.add("fooPart", fooPart);
+ parts.add("barPart", barPart);
+ converter.write(parts, MULTIPART_FORM_DATA, outputMessage);
+ byte[] content = outputMessage.getBodyAsBytes();
+ MockServerHttpRequest request = MockServerHttpRequest
+ .post("/foo")
+ .header(CONTENT_TYPE, outputMessage.getHeaders().getContentType().toString())
+ .header(CONTENT_LENGTH, String.valueOf(content.length))
+ .body(new String(content));
+ return request;
+ }
+
+ private ServerHttpRequest generateErrorMultipartRequest() {
+ DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
+ MockServerHttpRequest request = MockServerHttpRequest
+ .post("/foo")
+ .header(CONTENT_TYPE, MULTIPART_FORM_DATA.toString())
+ .body(Flux.just(bufferFactory.wrap("invalid content".getBytes())));
+ return request;
+ }
+
+}
diff --git a/spring-web/src/test/resources/org/springframework/http/codec/multipart/foo.txt b/spring-web/src/test/resources/org/springframework/http/codec/multipart/foo.txt
new file mode 100644
index 0000000000..08123a7d4d
--- /dev/null
+++ b/spring-web/src/test/resources/org/springframework/http/codec/multipart/foo.txt
@@ -0,0 +1,2 @@
+Lorem
+Ipsum