Fix memory leak when canceling read from AsynchronousFileChannel

This commit fixes a memory leak that occurs when reading from a
AsynchronousFileChannel, and cancelling the subscription.

Issue: SPR-17419
This commit is contained in:
Arjen Poutsma 2018-10-23 16:23:12 +02:00
parent 28cf7b728f
commit eac9e66c46
2 changed files with 38 additions and 1 deletions

View File

@ -139,12 +139,14 @@ public abstract class DataBufferUtils {
DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize);
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
Flux<DataBuffer> result = Flux.using(channelSupplier,
channel -> Flux.create(sink -> {
CompletionHandler<Integer, DataBuffer> completionHandler =
AsynchronousFileChannelReadCompletionHandler completionHandler =
new AsynchronousFileChannelReadCompletionHandler(channel,
sink, position, dataBufferFactory, bufferSize);
channel.read(byteBuffer, position, dataBuffer, completionHandler);
sink.onDispose(completionHandler::dispose);
}),
DataBufferUtils::closeChannel);
@ -545,6 +547,10 @@ public abstract class DataBufferUtils {
release(dataBuffer);
this.sink.error(exc);
}
public void dispose() {
this.disposed.set(true);
}
}

View File

@ -40,6 +40,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.support.DataBufferTestUtils;
@ -142,6 +143,36 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
.verify(Duration.ofSeconds(5));
}
@Test
public void readResourcePositionAndTakeUntil() throws Exception {
Resource resource = new ClassPathResource("DataBufferUtilsTests.txt", getClass());
Flux<DataBuffer> flux = DataBufferUtils.read(resource, 3, this.bufferFactory, 3);
flux = DataBufferUtils.takeUntilByteCount(flux, 5);
StepVerifier.create(flux)
.consumeNextWith(stringConsumer("bar"))
.consumeNextWith(stringConsumer("ba"))
.expectComplete()
.verify(Duration.ofSeconds(5));
}
@Test
public void readByteArrayResourcePositionAndTakeUntil() throws Exception {
Resource resource = new ByteArrayResource("foobarbazqux" .getBytes());
Flux<DataBuffer> flux = DataBufferUtils.read(resource, 3, this.bufferFactory, 3);
flux = DataBufferUtils.takeUntilByteCount(flux, 5);
StepVerifier.create(flux)
.consumeNextWith(stringConsumer("bar"))
.consumeNextWith(stringConsumer("ba"))
.expectComplete()
.verify(Duration.ofSeconds(5));
}
@Test
public void writeOutputStream() throws Exception {
DataBuffer foo = stringBuffer("foo");