Merge branch '5.3.x'
This commit is contained in:
commit
a71f0eb267
|
|
@ -0,0 +1,129 @@
|
|||
/*
|
||||
* Copyright 2002-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.http.codec.multipart;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||
|
||||
/**
|
||||
* Part content abstraction used by {@link DefaultParts}.
|
||||
*
|
||||
* @author Arjen Poutsma
|
||||
* @since 5.3.13
|
||||
*/
|
||||
abstract class Content {
|
||||
|
||||
|
||||
protected Content() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the content.
|
||||
*/
|
||||
public abstract Flux<DataBuffer> content();
|
||||
|
||||
/**
|
||||
* Delete this content. Default implementation does nothing.
|
||||
*/
|
||||
public Mono<Void> delete() {
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new {@code Content} based on the given flux of data buffers.
|
||||
*/
|
||||
public static Content fromFlux(Flux<DataBuffer> content) {
|
||||
return new FluxContent(content);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a new {@code Content} based on the given file path.
|
||||
*/
|
||||
public static Content fromFile(Path file, Scheduler scheduler) {
|
||||
return new FileContent(file, scheduler);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* {@code Content} implementation based on a flux of data buffers.
|
||||
*/
|
||||
private static final class FluxContent extends Content {
|
||||
|
||||
private final Flux<DataBuffer> content;
|
||||
|
||||
|
||||
public FluxContent(Flux<DataBuffer> content) {
|
||||
this.content = content;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Flux<DataBuffer> content() {
|
||||
return this.content;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* {@code Content} implementation based on a file.
|
||||
*/
|
||||
private static final class FileContent extends Content {
|
||||
|
||||
private final Path file;
|
||||
|
||||
private final Scheduler scheduler;
|
||||
|
||||
|
||||
public FileContent(Path file, Scheduler scheduler) {
|
||||
this.file = file;
|
||||
this.scheduler = scheduler;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Flux<DataBuffer> content() {
|
||||
return DataBufferUtils.readByteChannel(
|
||||
() -> Files.newByteChannel(this.file, StandardOpenOption.READ),
|
||||
DefaultDataBufferFactory.sharedInstance, 1024)
|
||||
.subscribeOn(this.scheduler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> delete() {
|
||||
return Mono.<Void>fromRunnable(() -> {
|
||||
try {
|
||||
Files.delete(this.file);
|
||||
}
|
||||
catch (IOException ex) {
|
||||
throw new UncheckedIOException(ex);
|
||||
}
|
||||
})
|
||||
.subscribeOn(this.scheduler);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -57,7 +57,7 @@ abstract class DefaultParts {
|
|||
* @param content the content of the part
|
||||
* @return {@link Part} or {@link FilePart}, depending on {@link HttpHeaders#getContentDisposition()}
|
||||
*/
|
||||
public static Part part(HttpHeaders headers, Flux<DataBuffer> content) {
|
||||
public static Part part(HttpHeaders headers, Content content) {
|
||||
Assert.notNull(headers, "Headers must not be null");
|
||||
Assert.notNull(content, "Content must not be null");
|
||||
|
||||
|
|
@ -142,16 +142,21 @@ abstract class DefaultParts {
|
|||
*/
|
||||
private static class DefaultPart extends AbstractPart {
|
||||
|
||||
private final Flux<DataBuffer> content;
|
||||
private final Content content;
|
||||
|
||||
public DefaultPart(HttpHeaders headers, Flux<DataBuffer> content) {
|
||||
public DefaultPart(HttpHeaders headers, Content content) {
|
||||
super(headers);
|
||||
this.content = content;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<DataBuffer> content() {
|
||||
return this.content;
|
||||
return this.content.content();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> delete() {
|
||||
return this.content.delete();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -171,9 +176,9 @@ abstract class DefaultParts {
|
|||
/**
|
||||
* Default implementation of {@link FilePart}.
|
||||
*/
|
||||
private static class DefaultFilePart extends DefaultPart implements FilePart {
|
||||
private static final class DefaultFilePart extends DefaultPart implements FilePart {
|
||||
|
||||
public DefaultFilePart(HttpHeaders headers, Flux<DataBuffer> content) {
|
||||
public DefaultFilePart(HttpHeaders headers, Content content) {
|
||||
super(headers, content);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
package org.springframework.http.codec.multipart;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
|
|
@ -57,4 +58,13 @@ public interface Part {
|
|||
*/
|
||||
Flux<DataBuffer> content();
|
||||
|
||||
/**
|
||||
* Return a mono that, when subscribed to, deletes the underlying storage
|
||||
* for this part.
|
||||
* @since 5.3.13
|
||||
*/
|
||||
default Mono<Void> delete() {
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -160,7 +160,7 @@ final class PartGenerator extends BaseSubscriber<MultipartParser.Token> {
|
|||
requestToken();
|
||||
}
|
||||
});
|
||||
emitPart(DefaultParts.part(headers, streamingContent));
|
||||
emitPart(DefaultParts.part(headers, Content.fromFlux(streamingContent)));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -518,7 +518,7 @@ final class PartGenerator extends BaseSubscriber<MultipartParser.Token> {
|
|||
}
|
||||
this.content.clear();
|
||||
Flux<DataBuffer> content = Flux.just(DefaultDataBufferFactory.sharedInstance.wrap(bytes));
|
||||
emitPart(DefaultParts.part(this.headers, content));
|
||||
emitPart(DefaultParts.part(this.headers, Content.fromFlux(content)));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -674,21 +674,13 @@ final class PartGenerator extends BaseSubscriber<MultipartParser.Token> {
|
|||
@Override
|
||||
public void partComplete(boolean finalPart) {
|
||||
MultipartUtils.closeChannel(this.channel);
|
||||
Flux<DataBuffer> content = partContent();
|
||||
emitPart(DefaultParts.part(this.headers, content));
|
||||
emitPart(DefaultParts.part(this.headers,
|
||||
Content.fromFile(this.file, PartGenerator.this.blockingOperationScheduler)));
|
||||
if (finalPart) {
|
||||
emitComplete();
|
||||
}
|
||||
}
|
||||
|
||||
private Flux<DataBuffer> partContent() {
|
||||
return DataBufferUtils
|
||||
.readByteChannel(
|
||||
() -> Files.newByteChannel(this.file, StandardOpenOption.READ),
|
||||
DefaultDataBufferFactory.sharedInstance, 1024)
|
||||
.subscribeOn(PartGenerator.this.blockingOperationScheduler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispose() {
|
||||
if (this.closeOnDispose) {
|
||||
|
|
|
|||
Loading…
Reference in New Issue