Add ResourceRegionHttpMessageWriter

This new `HttpMessageWriter` leverages the `ResourceRegionEncoder` to
write `ResourceRegion` to HTTP responses, thus supporting HTTP Range
requests.

Whenever possible, this message writer uses the zero copy support for
single range requests.

This `HttpMessageWriter` is never used directly, but is used as a
delegate by the `ResourceHttpMessageWriter`. When provided with the
`BOUNDARY_STRING_HINT`, the `ResourceRegionHttpMessageWriter`
adapts its behavior in order to write a single/multiple byte ranges.

Issue: SPR-14664
This commit is contained in:
Brian Clozel 2016-09-12 15:21:58 +02:00
parent 55d6f88dcd
commit a7a9e36ca0
4 changed files with 479 additions and 4 deletions

View File

@ -18,6 +18,9 @@ package org.springframework.http.codec;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -29,12 +32,19 @@ import org.springframework.core.ResolvableType;
import org.springframework.core.codec.ResourceDecoder;
import org.springframework.core.codec.ResourceEncoder;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.ResourceRegion;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpRange;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.MediaTypeFactory;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.ZeroCopyHttpOutputMessage;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.ResourceUtils;
import org.springframework.web.server.ResponseStatusException;
/**
* Implementation of {@link HttpMessageWriter} that can write
@ -44,19 +54,42 @@ import org.springframework.util.ResourceUtils;
* {@link DecoderHttpMessageReader}.
*
* @author Arjen Poutsma
* @author Brian Clozel
* @since 5.0
*/
public class ResourceHttpMessageWriter extends EncoderHttpMessageWriter<Resource> {
public class ResourceHttpMessageWriter extends AbstractServerHttpMessageWriter<Resource> {
public static final String HTTP_RANGE_REQUEST_HINT = ResourceHttpMessageWriter.class.getName() + ".httpRange";
private ResourceRegionHttpMessageWriter resourceRegionHttpMessageWriter;
public ResourceHttpMessageWriter() {
super(new ResourceEncoder());
super(new EncoderHttpMessageWriter<>(new ResourceEncoder()));
this.resourceRegionHttpMessageWriter = new ResourceRegionHttpMessageWriter();
}
public ResourceHttpMessageWriter(int bufferSize) {
super(new ResourceEncoder(bufferSize));
super(new EncoderHttpMessageWriter<>(new ResourceEncoder(bufferSize)));
this.resourceRegionHttpMessageWriter = new ResourceRegionHttpMessageWriter(bufferSize);
}
@Override
protected Map<String, Object> beforeWrite(ResolvableType streamType, ResolvableType elementType,
MediaType mediaType, ServerHttpRequest request, ServerHttpResponse response) {
try {
List<HttpRange> httpRanges = request.getHeaders().getRange();
if (!httpRanges.isEmpty()) {
response.setStatusCode(HttpStatus.PARTIAL_CONTENT);
return Collections.singletonMap(ResourceHttpMessageWriter.HTTP_RANGE_REQUEST_HINT, httpRanges);
}
}
catch (IllegalArgumentException ex) {
throw new ResponseStatusException(HttpStatus.REQUESTED_RANGE_NOT_SATISFIABLE,
"Could not parse Range request header", ex);
}
return Collections.emptyMap();
}
@Override
public Mono<Void> write(Publisher<? extends Resource> inputStream, ResolvableType elementType,
@ -71,6 +104,31 @@ public class ResourceHttpMessageWriter extends EncoderHttpMessageWriter<Resource
}));
}
@Override
@SuppressWarnings("unchecked")
public Mono<Void> write(Publisher<? extends Resource> inputStream, ResolvableType streamType,
ResolvableType elementType, MediaType mediaType, ServerHttpRequest request,
ServerHttpResponse response, Map<String, Object> hints) {
Map<String, Object> mergedHints = new HashMap<>(hints);
mergedHints.putAll(beforeWrite(streamType, elementType, mediaType, request, response));
if (mergedHints.containsKey(HTTP_RANGE_REQUEST_HINT)) {
List<HttpRange> httpRanges = (List<HttpRange>) mergedHints.get(HTTP_RANGE_REQUEST_HINT);
if (httpRanges.size() > 1) {
final String boundary = MimeTypeUtils.generateMultipartBoundaryString();
mergedHints.put(ResourceRegionHttpMessageWriter.BOUNDARY_STRING_HINT, boundary);
}
Flux<ResourceRegion> regions = Flux.from(inputStream)
.flatMap(resource -> Flux.fromIterable(HttpRange.toResourceRegions(httpRanges, resource)));
return this.resourceRegionHttpMessageWriter
.write(regions, ResolvableType.forClass(ResourceRegion.class), mediaType, response, mergedHints);
}
else {
return write(inputStream, elementType, mediaType, response, mergedHints);
}
}
protected void addHeaders(HttpHeaders headers, Resource resource, MediaType mediaType) {
if (headers.getContentType() == null) {
if (mediaType == null || !mediaType.isConcrete() ||
@ -85,7 +143,8 @@ public class ResourceHttpMessageWriter extends EncoderHttpMessageWriter<Resource
}
}
private Mono<Void> writeContent(Resource resource, ResolvableType type, ReactiveHttpOutputMessage outputMessage, Map<String, Object> hints) {
private Mono<Void> writeContent(Resource resource, ResolvableType type,
ReactiveHttpOutputMessage outputMessage, Map<String, Object> hints) {
if (outputMessage instanceof ZeroCopyHttpOutputMessage) {
Optional<File> file = getFile(resource);
if (file.isPresent()) {

View File

@ -0,0 +1,123 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.codec;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.ResourceRegionEncoder;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.ResourceRegion;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.ZeroCopyHttpOutputMessage;
import org.springframework.util.ResourceUtils;
/**
* Implementation of {@link HttpMessageWriter} that can write
* {@link ResourceRegion ResourceRegion}s.
*
* <p>Note that there is no {@link HttpMessageReader} counterpart.
*
* @author Brian Clozel
* @since 5.0
*/
class ResourceRegionHttpMessageWriter extends EncoderHttpMessageWriter<ResourceRegion> {
public static final String BOUNDARY_STRING_HINT = ResourceRegionHttpMessageWriter.class.getName() + ".boundaryString";
public ResourceRegionHttpMessageWriter() {
super(new ResourceRegionEncoder());
}
public ResourceRegionHttpMessageWriter(int bufferSize) {
super(new ResourceRegionEncoder(bufferSize));
}
@Override
public Mono<Void> write(Publisher<? extends ResourceRegion> inputStream, ResolvableType type,
MediaType contentType, ReactiveHttpOutputMessage outputMessage, Map<String, Object> hints) {
if (hints != null && hints.containsKey(BOUNDARY_STRING_HINT)) {
String boundary = (String) hints.get(BOUNDARY_STRING_HINT);
hints.put(ResourceRegionEncoder.BOUNDARY_STRING_HINT, boundary);
outputMessage.getHeaders()
.setContentType(MediaType.parseMediaType("multipart/byteranges;boundary=" + boundary));
return super.write(inputStream, type, contentType, outputMessage, hints);
}
else {
return Mono.from(inputStream)
.then(region -> {
writeSingleResourceRegionHeaders(region, contentType, outputMessage);
return writeResourceRegion(region, type, outputMessage);
});
}
}
private void writeSingleResourceRegionHeaders(ResourceRegion region, MediaType contentType,
ReactiveHttpOutputMessage outputMessage) {
OptionalLong resourceLength = ResourceUtils.contentLength(region.getResource());
resourceLength.ifPresent(length -> {
long start = region.getPosition();
long end = start + region.getCount() - 1;
end = Math.min(end, length - 1);
outputMessage.getHeaders().add("Content-Range", "bytes " + start + "-" + end + "/" + length);
outputMessage.getHeaders().setContentLength(end - start + 1);
});
outputMessage.getHeaders().setContentType(contentType);
}
private Mono<Void> writeResourceRegion(ResourceRegion region,
ResolvableType type, ReactiveHttpOutputMessage outputMessage) {
if (outputMessage instanceof ZeroCopyHttpOutputMessage) {
Optional<File> file = getFile(region.getResource());
if (file.isPresent()) {
ZeroCopyHttpOutputMessage zeroCopyResponse =
(ZeroCopyHttpOutputMessage) outputMessage;
return zeroCopyResponse.writeWith(file.get(), region.getPosition(), region.getCount());
}
}
// non-zero copy fallback, using ResourceRegionEncoder
return super.write(Mono.just(region), type,
outputMessage.getHeaders().getContentType(), outputMessage, Collections.emptyMap());
}
private static Optional<File> getFile(Resource resource) {
if (resource.isFile()) {
try {
return Optional.of(resource.getFile());
}
catch (IOException ex) {
// should not happen
}
}
return Optional.empty();
}
}

View File

@ -0,0 +1,130 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.codec;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.support.DataBufferTestUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpRange;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.mock.http.server.reactive.test.MockServerHttpRequest;
import org.springframework.mock.http.server.reactive.test.MockServerHttpResponse;
import org.springframework.tests.TestSubscriber;
import org.springframework.util.MimeTypeUtils;
import org.springframework.web.server.ResponseStatusException;
/**
* Unit tests for {@link ResourceHttpMessageWriter}.
* @author Brian Clozel
*/
public class ResourceHttpMessageWriterTests {
private ResourceHttpMessageWriter writer = new ResourceHttpMessageWriter();
private MockServerHttpRequest request = new MockServerHttpRequest();
private MockServerHttpResponse response = new MockServerHttpResponse();
private Resource resource;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Before
public void setUp() throws Exception {
String content = "Spring Framework test resource content.";
this.resource = new ByteArrayResource(content.getBytes(StandardCharsets.UTF_8));
}
@Test
public void writableMediaTypes() throws Exception {
assertThat(this.writer.getWritableMediaTypes(),
containsInAnyOrder(MimeTypeUtils.APPLICATION_OCTET_STREAM, MimeTypeUtils.ALL));
}
@Test
public void shouldWriteResource() throws Exception {
TestSubscriber.subscribe(this.writer.write(Mono.just(resource), null, ResolvableType.forClass(Resource.class),
MediaType.TEXT_PLAIN, this.request, this.response, Collections.emptyMap())).assertComplete();
assertThat(this.response.getHeaders().getContentType(), is(MediaType.TEXT_PLAIN));
assertThat(this.response.getHeaders().getContentLength(), is(39L));
Mono<String> result = reduceToString(this.response.getBody(), this.response.bufferFactory());
TestSubscriber.subscribe(result).assertComplete().assertValues("Spring Framework test resource content.");
}
@Test
public void shouldWriteResourceRange() throws Exception {
this.request.getHeaders().setRange(Collections.singletonList(HttpRange.createByteRange(0, 5)));
TestSubscriber.subscribe(this.writer.write(Mono.just(resource), null, ResolvableType.forClass(Resource.class),
MediaType.TEXT_PLAIN, this.request, this.response, Collections.emptyMap())).assertComplete();
assertThat(this.response.getHeaders().getContentType(), is(MediaType.TEXT_PLAIN));
assertThat(this.response.getHeaders().getFirst(HttpHeaders.CONTENT_RANGE), is("bytes 0-5/39"));
assertThat(this.response.getHeaders().getContentLength(), is(6L));
Mono<String> result = reduceToString(this.response.getBody(), this.response.bufferFactory());
TestSubscriber.subscribe(result).assertComplete().assertValues("Spring");
}
@Test
public void shouldThrowErrorInvalidRange() throws Exception {
this.request.getHeaders().set(HttpHeaders.RANGE, "invalid");
this.expectedException.expect(ResponseStatusException.class);
TestSubscriber.subscribe(this.writer.write(Mono.just(resource), null, ResolvableType.forClass(Resource.class),
MediaType.TEXT_PLAIN, this.request, this.response, Collections.emptyMap()));
this.expectedException.expect(Matchers.hasProperty("status", is(HttpStatus.REQUESTED_RANGE_NOT_SATISFIABLE)));
}
private Mono<String> reduceToString(Publisher<DataBuffer> buffers, DataBufferFactory bufferFactory) {
return Flux.from(buffers)
.reduce(bufferFactory.allocateBuffer(), (previous, current) -> {
previous.write(current);
DataBufferUtils.release(current);
return previous;
})
.map(buffer -> DataBufferTestUtils.dumpString(buffer, StandardCharsets.UTF_8));
}
}

View File

@ -0,0 +1,163 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.codec;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.support.DataBufferTestUtils;
import org.springframework.core.io.support.ResourceRegion;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.mock.http.server.reactive.test.MockServerHttpResponse;
import org.springframework.tests.TestSubscriber;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.StringUtils;
/**
* Unit tests for {@link ResourceRegionHttpMessageWriter}.
* @author Brian Clozel
*/
public class ResourceRegionHttpMessageWriterTests {
private ResourceRegionHttpMessageWriter writer = new ResourceRegionHttpMessageWriter();
private MockServerHttpResponse response = new MockServerHttpResponse();
private Resource resource;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Before
public void setUp() throws Exception {
String content = "Spring Framework test resource content.";
this.resource = new ByteArrayResource(content.getBytes(StandardCharsets.UTF_8));
}
@Test
public void defaultContentType() throws Exception {
assertEquals(MimeTypeUtils.APPLICATION_OCTET_STREAM,
this.writer.getDefaultContentType(ResolvableType.forClass(ResourceRegion.class)));
}
@Test
public void writableMediaTypes() throws Exception {
assertThat(this.writer.getWritableMediaTypes(),
containsInAnyOrder(MimeTypeUtils.APPLICATION_OCTET_STREAM, MimeTypeUtils.ALL));
}
@Test
public void shouldWriteResourceRegion() throws Exception {
ResourceRegion region = new ResourceRegion(this.resource, 0, 6);
TestSubscriber.subscribe(this.writer.write(Mono.just(region), ResolvableType.forClass(ResourceRegion.class),
MediaType.TEXT_PLAIN, this.response, Collections.emptyMap())).assertComplete();
assertThat(this.response.getHeaders().getContentType(), is(MediaType.TEXT_PLAIN));
assertThat(this.response.getHeaders().getFirst(HttpHeaders.CONTENT_RANGE), is("bytes 0-5/39"));
assertThat(this.response.getHeaders().getContentLength(), is(6L));
Mono<String> result = reduceToString(this.response.getBody(), this.response.bufferFactory());
TestSubscriber.subscribe(result).assertComplete().assertValues("Spring");
}
@Test
public void shouldWriteMultipleResourceRegions() throws Exception {
Flux<ResourceRegion> regions = Flux.just(
new ResourceRegion(this.resource, 0, 6),
new ResourceRegion(this.resource, 7, 9),
new ResourceRegion(this.resource, 17, 4),
new ResourceRegion(this.resource, 22, 17)
);
String boundary = MimeTypeUtils.generateMultipartBoundaryString();
Map<String, Object> hints = new HashMap<>(1);
hints.put(ResourceRegionHttpMessageWriter.BOUNDARY_STRING_HINT, boundary);
TestSubscriber.subscribe(
this.writer.write(regions, ResolvableType.forClass(ResourceRegion.class),
MediaType.TEXT_PLAIN, this.response, hints))
.assertComplete();
HttpHeaders headers = this.response.getHeaders();
assertThat(headers.getContentType().toString(), startsWith("multipart/byteranges;boundary=" + boundary));
Mono<String> result = reduceToString(this.response.getBody(), this.response.bufferFactory());
TestSubscriber
.subscribe(result).assertNoError()
.assertComplete()
.assertValuesWith(content -> {
String[] ranges = StringUtils.tokenizeToStringArray(content, "\r\n", false, true);
assertThat(ranges[0], is("--" + boundary));
assertThat(ranges[1], is("Content-Type: text/plain"));
assertThat(ranges[2], is("Content-Range: bytes 0-5/39"));
assertThat(ranges[3], is("Spring"));
assertThat(ranges[4], is("--" + boundary));
assertThat(ranges[5], is("Content-Type: text/plain"));
assertThat(ranges[6], is("Content-Range: bytes 7-15/39"));
assertThat(ranges[7], is("Framework"));
assertThat(ranges[8], is("--" + boundary));
assertThat(ranges[9], is("Content-Type: text/plain"));
assertThat(ranges[10], is("Content-Range: bytes 17-20/39"));
assertThat(ranges[11], is("test"));
assertThat(ranges[12], is("--" + boundary));
assertThat(ranges[13], is("Content-Type: text/plain"));
assertThat(ranges[14], is("Content-Range: bytes 22-38/39"));
assertThat(ranges[15], is("resource content."));
assertThat(ranges[16], is("--" + boundary + "--"));
});
}
private Mono<String> reduceToString(Publisher<DataBuffer> buffers, DataBufferFactory bufferFactory) {
return Flux.from(buffers)
.reduce(bufferFactory.allocateBuffer(), (previous, current) -> {
previous.write(current);
DataBufferUtils.release(current);
return previous;
})
.map(buffer -> DataBufferTestUtils.dumpString(buffer, StandardCharsets.UTF_8));
}
}