Schedule blocking ResourceHttpMessageWriter operations on bounded elastic

This commit schedules blocking I/O operations on the bounded elastic
scheduler, which includes retrieving the content length and writing
the resource (region).

Closes gh-30928
This commit is contained in:
Arjen Poutsma 2023-08-28 12:47:20 +02:00
parent 3ccbe3d1cb
commit 76c032cc11
2 changed files with 84 additions and 55 deletions

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2022 the original author or authors. * Copyright 2002-2023 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -20,12 +20,12 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import org.springframework.core.ResolvableType; import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Hints; import org.springframework.core.codec.Hints;
@ -116,39 +116,61 @@ public class ResourceHttpMessageWriter implements HttpMessageWriter<Resource> {
private Mono<Void> writeResource(Resource resource, ResolvableType type, @Nullable MediaType mediaType, private Mono<Void> writeResource(Resource resource, ResolvableType type, @Nullable MediaType mediaType,
ReactiveHttpOutputMessage message, Map<String, Object> hints) { ReactiveHttpOutputMessage message, Map<String, Object> hints) {
addHeaders(message, resource, mediaType, hints); return addDefaultHeaders(message, resource, mediaType, hints)
.then(Mono.defer(() -> {
return zeroCopy(resource, null, message, hints) Mono<Void> result = zeroCopy(resource, null, message, hints);
.orElseGet(() -> { if (result != null) {
Mono<Resource> input = Mono.just(resource); return result;
DataBufferFactory factory = message.bufferFactory();
Flux<DataBuffer> body = this.encoder.encode(input, factory, type, message.getHeaders().getContentType(), hints);
if (logger.isDebugEnabled()) {
body = body.doOnNext(buffer -> Hints.touchDataBuffer(buffer, hints, logger));
} }
return message.writeWith(body); else {
}); Mono<Resource> input = Mono.just(resource);
DataBufferFactory factory = message.bufferFactory();
Flux<DataBuffer> body = this.encoder.encode(input, factory, type, message.getHeaders().getContentType(), hints)
.subscribeOn(Schedulers.boundedElastic());
if (logger.isDebugEnabled()) {
body = body.doOnNext(buffer -> Hints.touchDataBuffer(buffer, hints, logger));
}
return message.writeWith(body);
}
}));
} }
/** /**
* Adds the default headers for the given resource to the given message. * Adds the default headers for the given resource to the given message.
* @since 6.0 * @since 6.0
* @deprecated since 6.1, in favor of {@link #addDefaultHeaders(ReactiveHttpOutputMessage, Resource, MediaType, Map)},
* for removal = 6.2
*/ */
@Deprecated(since = "6.1", forRemoval = true)
public void addHeaders(ReactiveHttpOutputMessage message, Resource resource, @Nullable MediaType contentType, Map<String, Object> hints) { public void addHeaders(ReactiveHttpOutputMessage message, Resource resource, @Nullable MediaType contentType, Map<String, Object> hints) {
HttpHeaders headers = message.getHeaders(); addDefaultHeaders(message, resource, contentType, hints).block();
MediaType resourceMediaType = getResourceMediaType(contentType, resource, hints); }
headers.setContentType(resourceMediaType);
if (headers.getContentLength() < 0) { /**
long length = lengthOf(resource); * Adds the default headers for the given resource to the given message.
if (length != -1) { * @since 6.1
headers.setContentLength(length); */
public Mono<Void> addDefaultHeaders(ReactiveHttpOutputMessage message, Resource resource, @Nullable MediaType contentType, Map<String, Object> hints) {
return Mono.defer(() -> {
HttpHeaders headers = message.getHeaders();
MediaType resourceMediaType = getResourceMediaType(contentType, resource, hints);
headers.setContentType(resourceMediaType);
if (message instanceof ServerHttpResponse) {
// server side
headers.set(HttpHeaders.ACCEPT_RANGES, "bytes");
} }
}
if (message instanceof ServerHttpResponse) { if (headers.getContentLength() < 0) {
// server side return lengthOf(resource)
headers.set(HttpHeaders.ACCEPT_RANGES, "bytes"); .flatMap(contentLength -> {
} headers.setContentLength(contentLength);
return Mono.empty();
});
}
else {
return Mono.empty();
}
});
} }
private static MediaType getResourceMediaType( private static MediaType getResourceMediaType(
@ -164,19 +186,21 @@ public class ResourceHttpMessageWriter implements HttpMessageWriter<Resource> {
return mediaType; return mediaType;
} }
private static long lengthOf(Resource resource) { private static Mono<Long> lengthOf(Resource resource) {
// Don't consume InputStream... // Don't consume InputStream...
if (InputStreamResource.class != resource.getClass()) { if (InputStreamResource.class != resource.getClass()) {
try { return Mono.fromCallable(resource::contentLength)
return resource.contentLength(); .filter(length -> length != -1)
} .onErrorResume(IOException.class, t -> Mono.empty())
catch (IOException ignored) { .subscribeOn(Schedulers.boundedElastic());
} }
else {
return Mono.empty();
} }
return -1;
} }
private static Optional<Mono<Void>> zeroCopy(Resource resource, @Nullable ResourceRegion region, @Nullable
private static Mono<Void> zeroCopy(Resource resource, @Nullable ResourceRegion region,
ReactiveHttpOutputMessage message, Map<String, Object> hints) { ReactiveHttpOutputMessage message, Map<String, Object> hints) {
if (message instanceof ZeroCopyHttpOutputMessage zeroCopyHttpOutputMessage && resource.isFile()) { if (message instanceof ZeroCopyHttpOutputMessage zeroCopyHttpOutputMessage && resource.isFile()) {
@ -188,13 +212,13 @@ public class ResourceHttpMessageWriter implements HttpMessageWriter<Resource> {
String formatted = region != null ? "region " + pos + "-" + (count) + " of " : ""; String formatted = region != null ? "region " + pos + "-" + (count) + " of " : "";
logger.debug(Hints.getLogPrefix(hints) + "Zero-copy " + formatted + "[" + resource + "]"); logger.debug(Hints.getLogPrefix(hints) + "Zero-copy " + formatted + "[" + resource + "]");
} }
return Optional.of(zeroCopyHttpOutputMessage.writeWith(file, pos, count)); return zeroCopyHttpOutputMessage.writeWith(file, pos, count);
} }
catch (IOException ex) { catch (IOException ex) {
// should not happen // should not happen
} }
} }
return Optional.empty(); return null;
} }
@ -227,15 +251,16 @@ public class ResourceHttpMessageWriter implements HttpMessageWriter<Resource> {
if (regions.size() == 1){ if (regions.size() == 1){
ResourceRegion region = regions.get(0); ResourceRegion region = regions.get(0);
headers.setContentType(resourceMediaType); headers.setContentType(resourceMediaType);
long contentLength = lengthOf(resource); return lengthOf(resource)
if (contentLength != -1) { .flatMap(contentLength -> {
long start = region.getPosition(); long start = region.getPosition();
long end = start + region.getCount() - 1; long end = start + region.getCount() - 1;
end = Math.min(end, contentLength - 1); end = Math.min(end, contentLength - 1);
headers.add("Content-Range", "bytes " + start + '-' + end + '/' + contentLength); headers.add("Content-Range", "bytes " + start + '-' + end + '/' + contentLength);
headers.setContentLength(end - start + 1); headers.setContentLength(end - start + 1);
} return Mono.empty();
return writeSingleRegion(region, response, hints); })
.then(writeSingleRegion(region, response, hints));
} }
else { else {
String boundary = MimeTypeUtils.generateMultipartBoundaryString(); String boundary = MimeTypeUtils.generateMultipartBoundaryString();
@ -250,19 +275,23 @@ public class ResourceHttpMessageWriter implements HttpMessageWriter<Resource> {
private Mono<Void> writeSingleRegion(ResourceRegion region, ReactiveHttpOutputMessage message, private Mono<Void> writeSingleRegion(ResourceRegion region, ReactiveHttpOutputMessage message,
Map<String, Object> hints) { Map<String, Object> hints) {
return zeroCopy(region.getResource(), region, message, hints) Mono<Void> result = zeroCopy(region.getResource(), region, message, hints);
.orElseGet(() -> { if (result != null) {
Publisher<? extends ResourceRegion> input = Mono.just(region); return result;
MediaType mediaType = message.getHeaders().getContentType(); }
return encodeAndWriteRegions(input, mediaType, message, hints); else {
}); Publisher<? extends ResourceRegion> input = Mono.just(region);
MediaType mediaType = message.getHeaders().getContentType();
return encodeAndWriteRegions(input, mediaType, message, hints);
}
} }
private Mono<Void> encodeAndWriteRegions(Publisher<? extends ResourceRegion> publisher, private Mono<Void> encodeAndWriteRegions(Publisher<? extends ResourceRegion> publisher,
@Nullable MediaType mediaType, ReactiveHttpOutputMessage message, Map<String, Object> hints) { @Nullable MediaType mediaType, ReactiveHttpOutputMessage message, Map<String, Object> hints) {
Flux<DataBuffer> body = this.regionEncoder.encode( Flux<DataBuffer> body = this.regionEncoder
publisher, message.bufferFactory(), REGION_TYPE, mediaType, hints); .encode(publisher, message.bufferFactory(), REGION_TYPE, mediaType,hints)
.subscribeOn(Schedulers.boundedElastic());
return message.writeWith(body); return message.writeWith(body);
} }

View File

@ -437,9 +437,9 @@ public class ResourceWebHandler implements WebHandler, InitializingBean {
ResourceHttpMessageWriter writer = getResourceHttpMessageWriter(); ResourceHttpMessageWriter writer = getResourceHttpMessageWriter();
Assert.state(writer != null, "No ResourceHttpMessageWriter"); Assert.state(writer != null, "No ResourceHttpMessageWriter");
if (HttpMethod.HEAD == httpMethod) { if (HttpMethod.HEAD == httpMethod) {
writer.addHeaders(exchange.getResponse(), resource, mediaType, return writer.addDefaultHeaders(exchange.getResponse(), resource, mediaType,
Hints.from(Hints.LOG_PREFIX_HINT, exchange.getLogPrefix())); Hints.from(Hints.LOG_PREFIX_HINT, exchange.getLogPrefix()))
return exchange.getResponse().setComplete(); .then(exchange.getResponse().setComplete());
} }
else { else {
return writer.write(Mono.just(resource), return writer.write(Mono.just(resource),