Add DataBufferUtils.read w/ AsynchFileChannel

This commit adds an overloaded DataBufferUtils.read method that operates
on a AsynchronousFileChannel (as opposed to a ReadableByteChannel, which
already existed). This commit also uses said method in the Resource
encoders, if the Resource is a file.
This commit is contained in:
Arjen Poutsma 2017-03-09 13:58:04 +01:00
parent e4b1a953bf
commit 63118c1ea7
6 changed files with 228 additions and 44 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,8 +16,11 @@
package org.springframework.core.codec;
import java.io.File;
import java.io.IOException;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.StandardOpenOption;
import java.util.Map;
import reactor.core.publisher.Flux;
@ -66,6 +69,18 @@ public class ResourceEncoder extends AbstractSingleValueEncoder<Resource> {
protected Flux<DataBuffer> encode(Resource resource, DataBufferFactory dataBufferFactory,
ResolvableType type, MimeType mimeType, Map<String, Object> hints) {
try {
if (resource.isFile()) {
File file = resource.getFile();
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ);
return DataBufferUtils.read(channel, dataBufferFactory, this.bufferSize);
}
}
catch (IOException ignore) {
// fallback to resource.readableChannel(), below
}
try {
ReadableByteChannel channel = resource.readableChannel();
return DataBufferUtils.read(channel, dataBufferFactory, this.bufferSize);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,10 +16,13 @@
package org.springframework.core.codec;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.StandardOpenOption;
import java.util.Map;
import java.util.OptionalLong;
@ -113,14 +116,31 @@ public class ResourceRegionEncoder extends AbstractEncoder<ResourceRegion> {
}
private Flux<DataBuffer> writeResourceRegion(ResourceRegion region, DataBufferFactory bufferFactory) {
Flux<DataBuffer> in = readResourceRegion(region, bufferFactory);
return DataBufferUtils.takeUntilByteCount(in, region.getCount());
}
private Flux<DataBuffer> readResourceRegion(ResourceRegion region, DataBufferFactory bufferFactory) {
Resource resource = region.getResource();
try {
ReadableByteChannel resourceChannel = region.getResource().readableChannel();
Flux<DataBuffer> in = DataBufferUtils.read(resourceChannel, bufferFactory, this.bufferSize);
Flux<DataBuffer> skipped = DataBufferUtils.skipUntilByteCount(in, region.getPosition());
return DataBufferUtils.takeUntilByteCount(skipped, region.getCount());
if (resource.isFile()) {
File file = region.getResource().getFile();
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ);
return DataBufferUtils.read(channel, region.getPosition(),
bufferFactory, this.bufferSize);
}
}
catch (IOException exc) {
return Flux.error(exc);
catch (IOException ignore) {
// fallback to resource.readableChannel(), below
}
try {
ReadableByteChannel channel = resource.readableChannel();
Flux<DataBuffer> in = DataBufferUtils.read(channel, bufferFactory, this.bufferSize);
return DataBufferUtils.skipUntilByteCount(in, region.getPosition());
}
catch (IOException ex) {
return Flux.error(ex);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,14 +19,17 @@ package org.springframework.core.io.buffer;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.Channel;
import java.nio.channels.Channels;
import java.nio.channels.CompletionHandler;
import java.nio.channels.ReadableByteChannel;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.SynchronousSink;
import org.springframework.util.Assert;
@ -40,17 +43,6 @@ import org.springframework.util.Assert;
*/
public abstract class DataBufferUtils {
private static final Consumer<ReadableByteChannel> CLOSE_CONSUMER = channel -> {
try {
if (channel != null) {
channel.close();
}
}
catch (IOException ex) {
}
};
/**
* Read the given {@code InputStream} into a {@code Flux} of
* {@code DataBuffer}s. Closes the input stream when the flux is terminated.
@ -85,7 +77,58 @@ public abstract class DataBufferUtils {
return Flux.generate(() -> channel,
new ReadableByteChannelGenerator(dataBufferFactory, bufferSize),
CLOSE_CONSUMER);
DataBufferUtils::closeChannel);
}
/**
* Read the given {@code AsynchronousFileChannel} into a {@code Flux} of
* {@code DataBuffer}s. Closes the channel when the flux is terminated.
* @param channel the channel to read from
* @param dataBufferFactory the factory to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
*/
public static Flux<DataBuffer> read(AsynchronousFileChannel channel,
DataBufferFactory dataBufferFactory, int bufferSize) {
return read(channel, 0, dataBufferFactory, bufferSize);
}
/**
* Read the given {@code AsynchronousFileChannel} into a {@code Flux} of
* {@code DataBuffer}s, starting at the given position. Closes the channel when the flux is
* terminated.
* @param channel the channel to read from
* @param position the position to start reading from
* @param dataBufferFactory the factory to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
*/
public static Flux<DataBuffer> read(AsynchronousFileChannel channel,
long position, DataBufferFactory dataBufferFactory, int bufferSize) {
Assert.notNull(channel, "'channel' must not be null");
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
Assert.isTrue(position >= 0, "'position' must be >= 0");
ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize);
return Flux.create(emitter -> {
emitter.setCancellation(() -> closeChannel(channel));
AsynchronousFileChannelCompletionHandler completionHandler =
new AsynchronousFileChannelCompletionHandler(emitter, position,
dataBufferFactory, byteBuffer);
channel.read(byteBuffer, position, channel, completionHandler);
});
}
private static void closeChannel(Channel channel) {
try {
if (channel != null) {
channel.close();
}
}
catch (IOException ignored) {
}
}
/**
@ -189,24 +232,23 @@ public abstract class DataBufferUtils {
private final DataBufferFactory dataBufferFactory;
private final int chunkSize;
private final ByteBuffer byteBuffer;
public ReadableByteChannelGenerator(DataBufferFactory dataBufferFactory, int chunkSize) {
this.dataBufferFactory = dataBufferFactory;
this.chunkSize = chunkSize;
this.byteBuffer = ByteBuffer.allocate(chunkSize);
}
@Override
public ReadableByteChannel apply(ReadableByteChannel channel, SynchronousSink<DataBuffer> sub) {
try {
ByteBuffer byteBuffer = ByteBuffer.allocate(chunkSize);
int read;
if ((read = channel.read(byteBuffer)) >= 0) {
byteBuffer.flip();
if ((read = channel.read(this.byteBuffer)) >= 0) {
this.byteBuffer.flip();
boolean release = true;
DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(read);
try {
dataBuffer.write(byteBuffer);
dataBuffer.write(this.byteBuffer);
release = false;
sub.next(dataBuffer);
}
@ -215,6 +257,7 @@ public abstract class DataBufferUtils {
release(dataBuffer);
}
}
this.byteBuffer.clear();
}
else {
sub.complete();
@ -227,4 +270,58 @@ public abstract class DataBufferUtils {
}
}
private static class AsynchronousFileChannelCompletionHandler
implements CompletionHandler<Integer, AsynchronousFileChannel> {
private final FluxSink<DataBuffer> emitter;
private final ByteBuffer byteBuffer;
private final DataBufferFactory dataBufferFactory;
private long position;
private AsynchronousFileChannelCompletionHandler(FluxSink<DataBuffer> emitter,
long position, DataBufferFactory dataBufferFactory, ByteBuffer byteBuffer) {
this.emitter = emitter;
this.position = position;
this.dataBufferFactory = dataBufferFactory;
this.byteBuffer = byteBuffer;
}
@Override
public void completed(Integer read, AsynchronousFileChannel channel) {
if (read != -1) {
this.position += read;
this.byteBuffer.flip();
boolean release = true;
DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(read);
try {
dataBuffer.write(this.byteBuffer);
release = false;
this.emitter.next(dataBuffer);
}
finally {
if (release) {
release(dataBuffer);
}
}
this.byteBuffer.clear();
if (!this.emitter.isCancelled()) {
channel.read(this.byteBuffer, this.position, channel, this);
}
}
else {
this.emitter.complete();
closeChannel(channel);
}
}
@Override
public void failed(Throwable exc, AsynchronousFileChannel channel) {
this.emitter.error(exc);
closeChannel(channel);
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -27,6 +27,7 @@ import reactor.test.StepVerifier;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;
import org.springframework.core.io.buffer.DataBuffer;
@ -51,13 +52,9 @@ public class ResourceRegionEncoderTests extends AbstractDataBufferAllocatingTest
private ResourceRegionEncoder encoder;
private Resource resource;
@Before
public void setUp() {
this.encoder = new ResourceRegionEncoder();
String content = "Spring Framework test resource content.";
this.resource = new ByteArrayResource(content.getBytes(StandardCharsets.UTF_8));
this.bufferFactory = new DefaultDataBufferFactory();
}
@ -74,12 +71,23 @@ public class ResourceRegionEncoderTests extends AbstractDataBufferAllocatingTest
}
@Test
public void shouldEncodeResourceRegion() throws Exception {
public void shouldEncodeResourceRegionFileResource() throws Exception {
shouldEncodeResourceRegion(
new ClassPathResource("ResourceRegionEncoderTests.txt", getClass()));
}
ResourceRegion region = new ResourceRegion(this.resource, 0, 6);
@Test
public void shouldEncodeResourceRegionByteArrayResource() throws Exception {
String content = "Spring Framework test resource content.";
shouldEncodeResourceRegion(new ByteArrayResource(content.getBytes(StandardCharsets.UTF_8)));
}
private void shouldEncodeResourceRegion(Resource resource) {
ResourceRegion region = new ResourceRegion(resource, 0, 6);
Flux<DataBuffer> result = this.encoder.encode(Mono.just(region), this.bufferFactory,
ResolvableType.forClass(ResourceRegion.class), MimeTypeUtils.APPLICATION_OCTET_STREAM
, Collections.emptyMap());
ResolvableType.forClass(ResourceRegion.class),
MimeTypeUtils.APPLICATION_OCTET_STREAM,
Collections.emptyMap());
StepVerifier.create(result)
.consumeNextWith(stringConsumer("Spring"))
@ -88,13 +96,24 @@ public class ResourceRegionEncoderTests extends AbstractDataBufferAllocatingTest
}
@Test
public void shouldEncodeMultipleResourceRegions() throws Exception {
public void shouldEncodeMultipleResourceRegionsFileResource() throws Exception {
shouldEncodeMultipleResourceRegions(
new ClassPathResource("ResourceRegionEncoderTests.txt", getClass()));
}
@Test
public void shouldEncodeMultipleResourceRegionsByteArrayResource() throws Exception {
String content = "Spring Framework test resource content.";
shouldEncodeMultipleResourceRegions(
new ByteArrayResource(content.getBytes(StandardCharsets.UTF_8)));
}
private void shouldEncodeMultipleResourceRegions(Resource resource) {
Flux<ResourceRegion> regions = Flux.just(
new ResourceRegion(this.resource, 0, 6),
new ResourceRegion(this.resource, 7, 9),
new ResourceRegion(this.resource, 17, 4),
new ResourceRegion(this.resource, 22, 17)
new ResourceRegion(resource, 0, 6),
new ResourceRegion(resource, 7, 9),
new ResourceRegion(resource, 17, 4),
new ResourceRegion(resource, 22, 17)
);
String boundary = MimeTypeUtils.generateMultipartBoundaryString();

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,6 +18,7 @@ package org.springframework.core.io.buffer;
import java.io.InputStream;
import java.net.URI;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
@ -34,7 +35,7 @@ import static org.junit.Assert.assertFalse;
public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
@Test
public void readChannel() throws Exception {
public void readReadableByteChannel() throws Exception {
URI uri = DataBufferUtilsTests.class.getResource("DataBufferUtilsTests.txt").toURI();
FileChannel channel = FileChannel.open(Paths.get(uri), StandardOpenOption.READ);
Flux<DataBuffer> flux = DataBufferUtils.read(channel, this.bufferFactory, 3);
@ -50,6 +51,37 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
assertFalse(channel.isOpen());
}
@Test
public void readAsynchronousFileChannel() throws Exception {
URI uri = DataBufferUtilsTests.class.getResource("DataBufferUtilsTests.txt").toURI();
AsynchronousFileChannel
channel = AsynchronousFileChannel.open(Paths.get(uri), StandardOpenOption.READ);
Flux<DataBuffer> flux = DataBufferUtils.read(channel, this.bufferFactory, 3);
StepVerifier.create(flux)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.consumeNextWith(stringConsumer("baz"))
.consumeNextWith(stringConsumer("qux"))
.expectComplete()
.verify();
}
@Test
public void readAsynchronousFileChannelPosition() throws Exception {
URI uri = DataBufferUtilsTests.class.getResource("DataBufferUtilsTests.txt").toURI();
AsynchronousFileChannel
channel = AsynchronousFileChannel.open(Paths.get(uri), StandardOpenOption.READ);
Flux<DataBuffer> flux = DataBufferUtils.read(channel, 3, this.bufferFactory, 3);
StepVerifier.create(flux)
.consumeNextWith(stringConsumer("bar"))
.consumeNextWith(stringConsumer("baz"))
.consumeNextWith(stringConsumer("qux"))
.expectComplete()
.verify();
}
@Test
public void readUnalignedChannel() throws Exception {
URI uri = DataBufferUtilsTests.class.getResource("DataBufferUtilsTests.txt").toURI();

View File

@ -0,0 +1 @@
Spring Framework test resource content.