Add ResourceRegionEncoder

This commit adds the necessary infrastructure for the support of HTTP
Range requests. The new `ResourceRegionEncoder` can write
`ResourceRegion` objects as streams of bytes.

The `ResourceRegionEncoder` relies on an encoding hint
`BOUNDARY_STRING_HINT`. If present, the encoder infers that multiple
`ResourceRegion`s should be encoded and that the provided boundary
String should be used to separate ranges by mime boundaries.
If that hint is absent, only a single resource region is encoded.

Issue: SPR-14664
This commit is contained in:
Brian Clozel 2016-09-12 12:18:56 +02:00
parent 1d46b8d7e1
commit 55d6f88dcd
6 changed files with 385 additions and 16 deletions

View File

@ -0,0 +1,146 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.codec;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.OptionalLong;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.support.ResourceRegion;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.ResourceUtils;
import org.springframework.util.StreamUtils;
/**
* Encoder for {@link ResourceRegion}s.
*
* @author Brian Clozel
* @since 5.0
*/
public class ResourceRegionEncoder extends AbstractEncoder<ResourceRegion> {
public static final int DEFAULT_BUFFER_SIZE = StreamUtils.BUFFER_SIZE;
public static final String BOUNDARY_STRING_HINT = ResourceRegionEncoder.class.getName() + ".boundaryString";
private final int bufferSize;
public ResourceRegionEncoder() {
this(DEFAULT_BUFFER_SIZE);
}
public ResourceRegionEncoder(int bufferSize) {
super(MimeTypeUtils.APPLICATION_OCTET_STREAM, MimeTypeUtils.ALL);
Assert.isTrue(bufferSize > 0, "'bufferSize' must be larger than 0");
this.bufferSize = bufferSize;
}
@Override
public boolean canEncode(ResolvableType elementType, MimeType mimeType) {
return super.canEncode(elementType, mimeType)
&& ResourceRegion.class.isAssignableFrom(elementType.getRawClass());
}
@Override
public Flux<DataBuffer> encode(Publisher<? extends ResourceRegion> inputStream,
DataBufferFactory bufferFactory, ResolvableType elementType, MimeType mimeType, Map<String, Object> hints) {
Assert.notNull(inputStream, "'inputStream' must not be null");
Assert.notNull(bufferFactory, "'bufferFactory' must not be null");
Assert.notNull(elementType, "'elementType' must not be null");
if (inputStream instanceof Mono) {
return ((Mono<? extends ResourceRegion>) inputStream)
.flatMap(region -> writeResourceRegion(region, bufferFactory));
}
else {
Assert.notNull(hints, "'hints' must not be null");
Assert.isTrue(hints.containsKey(BOUNDARY_STRING_HINT), "'hints' must contain boundaryString hint");
final String boundaryString = (String) hints.get(BOUNDARY_STRING_HINT);
byte[] startBoundary = getAsciiBytes("\r\n--" + boundaryString + "\r\n");
byte[] contentType = getAsciiBytes("Content-Type: " + mimeType.toString() + "\r\n");
Flux<DataBuffer> regions = Flux.from(inputStream).
concatMap(region ->
Flux.concat(
getRegionPrefix(bufferFactory, startBoundary, contentType, region),
writeResourceRegion(region, bufferFactory)
));
return Flux.concat(regions, getRegionSuffix(bufferFactory, boundaryString));
}
}
private Flux<DataBuffer> getRegionPrefix(DataBufferFactory bufferFactory, byte[] startBoundary,
byte[] contentType, ResourceRegion region) {
return Flux.just(
bufferFactory.allocateBuffer(startBoundary.length).write(startBoundary),
bufferFactory.allocateBuffer(contentType.length).write(contentType),
bufferFactory.wrap(ByteBuffer.wrap(getContentRangeHeader(region)))
);
}
private Flux<DataBuffer> writeResourceRegion(ResourceRegion region, DataBufferFactory bufferFactory) {
try {
ReadableByteChannel resourceChannel = region.getResource().readableChannel();
Flux<DataBuffer> in = DataBufferUtils.read(resourceChannel, bufferFactory, this.bufferSize);
Flux<DataBuffer> skipped = DataBufferUtils.skipUntilByteCount(in, region.getPosition());
return DataBufferUtils.takeUntilByteCount(skipped, region.getCount());
}
catch (IOException exc) {
return Flux.error(exc);
}
}
private Flux<DataBuffer> getRegionSuffix(DataBufferFactory bufferFactory, String boundaryString) {
byte[] endBoundary = getAsciiBytes("\r\n--" + boundaryString + "--");
return Flux.just(bufferFactory.allocateBuffer(endBoundary.length).write(endBoundary));
}
private byte[] getAsciiBytes(String in) {
return in.getBytes(StandardCharsets.US_ASCII);
}
private byte[] getContentRangeHeader(ResourceRegion region) {
long start = region.getPosition();
long end = start + region.getCount() - 1;
OptionalLong contentLength = ResourceUtils.contentLength(region.getResource());
if (contentLength.isPresent()) {
return getAsciiBytes("Content-Range: bytes " + start + "-" + end + "/" + contentLength.getAsLong() + "\r\n\r\n");
}
else {
return getAsciiBytes("Content-Range: bytes " + start + "-" + end + "\r\n\r\n");
}
}
}

View File

@ -31,10 +31,11 @@ import reactor.core.publisher.SynchronousSink;
import org.springframework.util.Assert;
/**i
/**
* Utility class for working with {@link DataBuffer}s.
*
* @author Arjen Poutsma
* @author Brian Clozel
* @since 5.0
*/
public abstract class DataBufferUtils {
@ -119,6 +120,42 @@ public abstract class DataBufferUtils {
});
}
/**
* Skip buffers from the given {@link Publisher} until the total
* {@linkplain DataBuffer#readableByteCount() byte count} reaches
* the given maximum byte count, or until the publisher is complete.
* @param publisher the publisher to filter
* @param maxByteCount the maximum byte count
* @return a flux with the remaining part of the given publisher
*/
public static Flux<DataBuffer> skipUntilByteCount(Publisher<DataBuffer> publisher, long maxByteCount) {
Assert.notNull(publisher, "Publisher must not be null");
Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
AtomicLong byteCountDown = new AtomicLong(maxByteCount);
return Flux.from(publisher).
skipUntil(dataBuffer -> {
int delta = -dataBuffer.readableByteCount();
long currentCount = byteCountDown.addAndGet(delta);
if(currentCount < 0) {
return true;
} else {
DataBufferUtils.release(dataBuffer);
return false;
}
}).
map(dataBuffer -> {
long currentCount = byteCountDown.get();
// slice first buffer, then let others flow through
if (currentCount < 0) {
int skip = (int) (currentCount + dataBuffer.readableByteCount());
byteCountDown.set(0);
return dataBuffer.slice(skip, dataBuffer.readableByteCount() - skip);
}
return dataBuffer;
});
}
/**
* Retain the given data buffer, it it is a {@link PooledDataBuffer}.
* @param dataBuffer the data buffer to retain

View File

@ -18,11 +18,16 @@ package org.springframework.util;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLConnection;
import java.util.OptionalLong;
import org.springframework.core.io.InputStreamResource;
import org.springframework.core.io.Resource;
/**
* Utility methods for resolving resource locations to files in the
@ -384,4 +389,23 @@ public abstract class ResourceUtils {
con.setUseCaches(con.getClass().getSimpleName().startsWith("JNLP"));
}
/**
* Determine, if possible, the contentLength of the given resource
* without reading it.
* @param resource the resource instance
* @return the contentLength of the resource
*/
public static OptionalLong contentLength(Resource resource) {
// Don't try to determine contentLength on InputStreamResource - cannot be read afterwards...
// Note: custom InputStreamResource subclasses could provide a pre-calculated content length!
if (InputStreamResource.class != resource.getClass()) {
try {
return OptionalLong.of(resource.contentLength());
}
catch (IOException ignored) {
}
}
return OptionalLong.empty();
}
}

View File

@ -0,0 +1,145 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.codec;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.io.buffer.support.DataBufferTestUtils;
import org.springframework.core.io.support.ResourceRegion;
import org.springframework.tests.TestSubscriber;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.StringUtils;
/**
* Test cases for {@link ResourceRegionEncoder} class.
*
* @author Brian Clozel
*/
public class ResourceRegionEncoderTests extends AbstractDataBufferAllocatingTestCase {
private ResourceRegionEncoder encoder;
private Resource resource;
@Before
public void setUp() {
this.encoder = new ResourceRegionEncoder();
String content = "Spring Framework test resource content.";
this.resource = new ByteArrayResource(content.getBytes(StandardCharsets.UTF_8));
this.bufferFactory = new DefaultDataBufferFactory();
}
@Test
public void canEncode() {
ResolvableType resourceRegion = ResolvableType.forClass(ResourceRegion.class);
MimeType allMimeType = MimeType.valueOf("*/*");
assertFalse(this.encoder.canEncode(ResolvableType.forClass(Resource.class),
MimeTypeUtils.APPLICATION_OCTET_STREAM));
assertFalse(this.encoder.canEncode(ResolvableType.forClass(Resource.class), allMimeType));
assertTrue(this.encoder.canEncode(resourceRegion, MimeTypeUtils.APPLICATION_OCTET_STREAM));
assertTrue(this.encoder.canEncode(resourceRegion, allMimeType));
}
@Test
public void shouldEncodeResourceRegion() throws Exception {
ResourceRegion region = new ResourceRegion(this.resource, 0, 6);
Flux<DataBuffer> result = this.encoder.encode(Mono.just(region), this.bufferFactory,
ResolvableType.forClass(ResourceRegion.class), MimeTypeUtils.APPLICATION_OCTET_STREAM
, Collections.emptyMap());
TestSubscriber.subscribe(result)
.assertNoError()
.assertComplete()
.assertValuesWith(stringConsumer("Spring"));
}
@Test
public void shouldEncodeMultipleResourceRegions() throws Exception {
Flux<ResourceRegion> regions = Flux.just(
new ResourceRegion(this.resource, 0, 6),
new ResourceRegion(this.resource, 7, 9),
new ResourceRegion(this.resource, 17, 4),
new ResourceRegion(this.resource, 22, 17)
);
String boundary = MimeTypeUtils.generateMultipartBoundaryString();
Flux<DataBuffer> result = this.encoder.encode(regions, this.bufferFactory,
ResolvableType.forClass(ResourceRegion.class),
MimeType.valueOf("text/plain"),
Collections.singletonMap(ResourceRegionEncoder.BOUNDARY_STRING_HINT, boundary)
);
Mono<DataBuffer> reduced = result
.reduce(bufferFactory.allocateBuffer(), (previous, current) -> {
previous.write(current);
DataBufferUtils.release(current);
return previous;
});
TestSubscriber
.subscribe(reduced)
.assertNoError()
.assertComplete()
.assertValuesWith(dataBuffer -> {
String content = DataBufferTestUtils.dumpString(dataBuffer, StandardCharsets.UTF_8);
String[] ranges = StringUtils.tokenizeToStringArray(content, "\r\n", false, true);
assertThat(ranges[0], is("--" + boundary));
assertThat(ranges[1], is("Content-Type: text/plain"));
assertThat(ranges[2], is("Content-Range: bytes 0-5/39"));
assertThat(ranges[3], is("Spring"));
assertThat(ranges[4], is("--" + boundary));
assertThat(ranges[5], is("Content-Type: text/plain"));
assertThat(ranges[6], is("Content-Range: bytes 7-15/39"));
assertThat(ranges[7], is("Framework"));
assertThat(ranges[8], is("--" + boundary));
assertThat(ranges[9], is("Content-Type: text/plain"));
assertThat(ranges[10], is("Content-Range: bytes 17-20/39"));
assertThat(ranges[11], is("test"));
assertThat(ranges[12], is("--" + boundary));
assertThat(ranges[13], is("Content-Type: text/plain"));
assertThat(ranges[14], is("Content-Range: bytes 22-38/39"));
assertThat(ranges[15], is("resource content."));
assertThat(ranges[16], is("--" + boundary + "--"));
});
}
}

View File

@ -99,4 +99,34 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
release(baz);
}
@Test
public void skipUntilByteCount() {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
DataBuffer baz = stringBuffer("baz");
Flux<DataBuffer> flux = Flux.just(foo, bar, baz);
Flux<DataBuffer> result = DataBufferUtils.skipUntilByteCount(flux, 5L);
TestSubscriber
.subscribe(result)
.assertNoError()
.assertComplete()
.assertValuesWith(stringConsumer("r"), stringConsumer("baz"));
}
@Test
public void skipUntilByteCountShouldSkipAll() {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
DataBuffer baz = stringBuffer("baz");
Flux<DataBuffer> flux = Flux.just(foo, bar, baz);
Flux<DataBuffer> result = DataBufferUtils.skipUntilByteCount(flux, 9L);
TestSubscriber
.subscribe(result)
.assertNoError()
.assertNoValues()
.assertComplete();
}
}

View File

@ -28,13 +28,13 @@ import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.ResourceDecoder;
import org.springframework.core.codec.ResourceEncoder;
import org.springframework.core.io.InputStreamResource;
import org.springframework.core.io.Resource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.MediaTypeFactory;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.ZeroCopyHttpOutputMessage;
import org.springframework.util.ResourceUtils;
/**
* Implementation of {@link HttpMessageWriter} that can write
@ -81,7 +81,7 @@ public class ResourceHttpMessageWriter extends EncoderHttpMessageWriter<Resource
headers.setContentType(mediaType);
}
if (headers.getContentLength() < 0) {
contentLength(resource).ifPresent(headers::setContentLength);
ResourceUtils.contentLength(resource).ifPresent(headers::setContentLength);
}
}
@ -101,19 +101,6 @@ public class ResourceHttpMessageWriter extends EncoderHttpMessageWriter<Resource
outputMessage.getHeaders().getContentType(), outputMessage, hints);
}
private static Optional<Long> contentLength(Resource resource) {
// Don't try to determine contentLength on InputStreamResource - cannot be read afterwards...
// Note: custom InputStreamResource subclasses could provide a pre-calculated content length!
if (InputStreamResource.class != resource.getClass()) {
try {
return Optional.of(resource.contentLength());
}
catch (IOException ignored) {
}
}
return Optional.empty();
}
private static Optional<File> getFile(Resource resource) {
if (resource.isFile()) {
try {