Review DataBufferUtils for cancellation memory leaks

Issue: SPR-17408
This commit is contained in:
Arjen Poutsma 2018-10-24 16:27:12 +02:00
parent 611019b73c
commit 488a1d4561
3 changed files with 298 additions and 115 deletions

View File

@ -246,23 +246,12 @@ public abstract class DataBufferUtils {
Assert.notNull(channel, "'channel' must not be null");
Flux<DataBuffer> flux = Flux.from(source);
return Flux.create(sink ->
flux.subscribe(dataBuffer -> {
try {
ByteBuffer byteBuffer = dataBuffer.asByteBuffer();
while (byteBuffer.hasRemaining()) {
channel.write(byteBuffer);
}
sink.next(dataBuffer);
}
catch (IOException ex) {
sink.next(dataBuffer);
sink.error(ex);
}
},
sink::error,
sink::complete));
return Flux.create(sink -> {
WritableByteChannelSubscriber subscriber =
new WritableByteChannelSubscriber(sink, channel);
sink.onDispose(subscriber);
flux.subscribe(subscriber);
});
}
/**
@ -305,11 +294,15 @@ public abstract class DataBufferUtils {
Assert.isTrue(position >= 0, "'position' must be >= 0");
Flux<DataBuffer> flux = Flux.from(source);
return Flux.create(sink ->
flux.subscribe(new AsynchronousFileChannelWriteCompletionHandler(sink, channel, position)));
return Flux.create(sink -> {
AsynchronousFileChannelWriteCompletionHandler completionHandler =
new AsynchronousFileChannelWriteCompletionHandler(sink, channel, position);
sink.onDispose(completionHandler);
flux.subscribe(completionHandler);
});
}
private static void closeChannel(@Nullable Channel channel) {
static void closeChannel(@Nullable Channel channel) {
if (channel != null && channel.isOpen()) {
try {
channel.close();
@ -554,6 +547,50 @@ public abstract class DataBufferUtils {
}
private static class WritableByteChannelSubscriber extends BaseSubscriber<DataBuffer> {
private final FluxSink<DataBuffer> sink;
private final WritableByteChannel channel;
public WritableByteChannelSubscriber(FluxSink<DataBuffer> sink, WritableByteChannel channel) {
this.sink = sink;
this.channel = channel;
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1);
}
@Override
protected void hookOnNext(DataBuffer dataBuffer) {
try {
ByteBuffer byteBuffer = dataBuffer.asByteBuffer();
while (byteBuffer.hasRemaining()) {
this.channel.write(byteBuffer);
}
this.sink.next(dataBuffer);
request(1);
}
catch (IOException ex) {
this.sink.next(dataBuffer);
this.sink.error(ex);
}
}
@Override
protected void hookOnError(Throwable throwable) {
this.sink.error(throwable);
}
@Override
protected void hookOnComplete() {
this.sink.complete();
}
}
private static class AsynchronousFileChannelWriteCompletionHandler extends BaseSubscriber<DataBuffer>
implements CompletionHandler<Integer, ByteBuffer> {

View File

@ -30,6 +30,7 @@ import org.junit.Rule;
import org.junit.rules.Verifier;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.support.DataBufferTestUtils;
@ -69,6 +70,10 @@ public abstract class AbstractDataBufferAllocatingTestCase {
return byteBuffer(value.getBytes(StandardCharsets.UTF_8));
}
protected Mono<DataBuffer> deferStringBuffer(String value) {
return Mono.defer(() -> Mono.just(stringBuffer(value)));
}
protected DataBuffer byteBuffer(byte[] value) {
DataBuffer buffer = this.bufferFactory.allocateBuffer(value.length);
buffer.write(value);

View File

@ -34,6 +34,7 @@ import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import io.netty.buffer.ByteBuf;
import org.junit.Before;
import org.junit.Test;
import org.mockito.stubbing.Answer;
import reactor.core.publisher.Flux;
@ -54,93 +55,160 @@ import static org.mockito.Mockito.*;
*/
public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
private Resource resource;
private Path tempFile;
@Before
public void setUp() throws IOException {
this.resource = new ClassPathResource("DataBufferUtilsTests.txt", getClass());
this.tempFile = Files.createTempFile("DataBufferUtilsTests", null);
}
@Test
public void readInputStream() {
Flux<DataBuffer> flux = DataBufferUtils.readInputStream(
() -> this.resource.getInputStream(), this.bufferFactory, 3);
verifyReadData(flux);
}
@Test
public void readByteChannel() throws Exception {
URI uri = DataBufferUtilsTests.class.getResource("DataBufferUtilsTests.txt").toURI();
Flux<DataBuffer> flux =
URI uri = this.resource.getURI();
Flux<DataBuffer> result =
DataBufferUtils.readByteChannel(() -> FileChannel.open(Paths.get(uri), StandardOpenOption.READ),
this.bufferFactory, 3);
StepVerifier.create(flux)
verifyReadData(result);
}
@Test
public void readByteChannelError() throws Exception {
ReadableByteChannel channel = mock(ReadableByteChannel.class);
when(channel.read(any()))
.thenAnswer(invocation -> {
ByteBuffer buffer = invocation.getArgument(0);
buffer.put("foo".getBytes(StandardCharsets.UTF_8));
buffer.flip();
return 3;
})
.thenThrow(new IOException());
Flux<DataBuffer> result =
DataBufferUtils.readByteChannel(() -> channel, this.bufferFactory, 3);
StepVerifier.create(result)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.consumeNextWith(stringConsumer("baz"))
.consumeNextWith(stringConsumer("qux"))
.expectComplete()
.verify(Duration.ofSeconds(5));
.expectError(IOException.class)
.verify(Duration.ofSeconds(3));
}
@Test
public void readByteChannelCancel() throws Exception {
URI uri = this.resource.getURI();
Flux<DataBuffer> result =
DataBufferUtils.readByteChannel(() -> FileChannel.open(Paths.get(uri), StandardOpenOption.READ),
this.bufferFactory, 3);
StepVerifier.create(result)
.consumeNextWith(stringConsumer("foo"))
.thenCancel()
.verify();
}
@Test
public void readAsynchronousFileChannel() throws Exception {
URI uri = DataBufferUtilsTests.class.getResource("DataBufferUtilsTests.txt").toURI();
URI uri = this.resource.getURI();
Flux<DataBuffer> flux = DataBufferUtils.readAsynchronousFileChannel(
() -> AsynchronousFileChannel.open(Paths.get(uri), StandardOpenOption.READ),
this.bufferFactory, 3);
StepVerifier.create(flux)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.consumeNextWith(stringConsumer("baz"))
.consumeNextWith(stringConsumer("qux"))
.expectComplete()
.verify(Duration.ofSeconds(5));
verifyReadData(flux);
}
@Test
public void readAsynchronousFileChannelPosition() throws Exception {
URI uri = DataBufferUtilsTests.class.getResource("DataBufferUtilsTests.txt").toURI();
URI uri = this.resource.getURI();
Flux<DataBuffer> flux = DataBufferUtils.readAsynchronousFileChannel(
() -> AsynchronousFileChannel.open(Paths.get(uri), StandardOpenOption.READ),
3, this.bufferFactory, 3);
9, this.bufferFactory, 3);
StepVerifier.create(flux)
.consumeNextWith(stringConsumer("bar"))
.consumeNextWith(stringConsumer("baz"))
.consumeNextWith(stringConsumer("qux"))
.expectComplete()
.verify(Duration.ofSeconds(5));
}
@Test
public void readInputStream() throws Exception {
Flux<DataBuffer> flux = DataBufferUtils.readInputStream(
() -> DataBufferUtilsTests.class.getResourceAsStream("DataBufferUtilsTests.txt"),
public void readAsynchronousFileChannelError() throws Exception {
AsynchronousFileChannel channel = mock(AsynchronousFileChannel.class);
doAnswer(invocation -> {
ByteBuffer byteBuffer = invocation.getArgument(0);
byteBuffer.put("foo".getBytes(StandardCharsets.UTF_8));
byteBuffer.flip();
long pos = invocation.getArgument(1);
assertEquals(0, pos);
DataBuffer dataBuffer = invocation.getArgument(2);
CompletionHandler<Integer, DataBuffer> completionHandler = invocation.getArgument(3);
completionHandler.completed(3, dataBuffer);
return null;
}).doAnswer(invocation -> {
DataBuffer dataBuffer = invocation.getArgument(2);
CompletionHandler<Integer, DataBuffer> completionHandler = invocation.getArgument(3);
completionHandler.failed(new IOException(), dataBuffer);
return null;
})
.when(channel).read(any(), anyLong(), any(), any());
Flux<DataBuffer> result =
DataBufferUtils.readAsynchronousFileChannel(() -> channel, this.bufferFactory, 3);
StepVerifier.create(result)
.consumeNextWith(stringConsumer("foo"))
.expectError(IOException.class)
.verify(Duration.ofSeconds(3));
}
@Test
public void readAsynchronousFileChannelCancel() throws Exception {
URI uri = this.resource.getURI();
Flux<DataBuffer> flux = DataBufferUtils.readAsynchronousFileChannel(
() -> AsynchronousFileChannel.open(Paths.get(uri), StandardOpenOption.READ),
this.bufferFactory, 3);
StepVerifier.create(flux)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.consumeNextWith(stringConsumer("baz"))
.thenCancel()
.verify();
}
@Test
public void readResource() throws Exception {
Flux<DataBuffer> flux = DataBufferUtils.read(this.resource, this.bufferFactory, 3);
verifyReadData(flux);
}
@Test
public void readResourcePosition() throws Exception {
Flux<DataBuffer> flux = DataBufferUtils.read(this.resource, 9, this.bufferFactory, 3);
StepVerifier.create(flux)
.consumeNextWith(stringConsumer("qux"))
.expectComplete()
.verify(Duration.ofSeconds(5));
}
@Test
public void readResource() throws Exception {
Resource resource = new ClassPathResource("DataBufferUtilsTests.txt", getClass());
Flux<DataBuffer> flux = DataBufferUtils.read(resource, this.bufferFactory, 3);
StepVerifier.create(flux)
private void verifyReadData(Flux<DataBuffer> buffers) {
StepVerifier.create(buffers)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.consumeNextWith(stringConsumer("baz"))
.consumeNextWith(stringConsumer("qux"))
.expectComplete()
.verify(Duration.ofSeconds(5));
}
@Test
public void readResourcePosition() throws Exception {
Resource resource = new ClassPathResource("DataBufferUtilsTests.txt", getClass());
Flux<DataBuffer> flux = DataBufferUtils.read(resource, 3, this.bufferFactory, 3);
StepVerifier.create(flux)
.consumeNextWith(stringConsumer("bar"))
.consumeNextWith(stringConsumer("baz"))
.consumeNextWith(stringConsumer("qux"))
.expectComplete()
.verify(Duration.ofSeconds(5));
.verify(Duration.ofSeconds(3));
}
@Test
@ -181,21 +249,10 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
DataBuffer qux = stringBuffer("qux");
Flux<DataBuffer> flux = Flux.just(foo, bar, baz, qux);
Path tempFile = Files.createTempFile("DataBufferUtilsTests", null);
OutputStream os = Files.newOutputStream(tempFile);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, os);
StepVerifier.create(writeResult)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.consumeNextWith(stringConsumer("baz"))
.consumeNextWith(stringConsumer("qux"))
.expectComplete()
.verify(Duration.ofSeconds(5));
String result = String.join("", Files.readAllLines(tempFile));
assertEquals("foobarbazqux", result);
verifyWrittenData(writeResult);
os.close();
}
@ -207,21 +264,10 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
DataBuffer qux = stringBuffer("qux");
Flux<DataBuffer> flux = Flux.just(foo, bar, baz, qux);
Path tempFile = Files.createTempFile("DataBufferUtilsTests", null);
WritableByteChannel channel = Files.newByteChannel(tempFile, StandardOpenOption.WRITE);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
StepVerifier.create(writeResult)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.consumeNextWith(stringConsumer("baz"))
.consumeNextWith(stringConsumer("qux"))
.expectComplete()
.verify(Duration.ofSeconds(5));
String result = String.join("", Files.readAllLines(tempFile));
assertEquals("foobarbazqux", result);
verifyWrittenData(writeResult);
channel.close();
}
@ -231,7 +277,6 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
DataBuffer bar = stringBuffer("bar");
Flux<DataBuffer> flux = Flux.just(foo, bar).concatWith(Flux.error(new RuntimeException()));
Path tempFile = Files.createTempFile("DataBufferUtilsTests", null);
WritableByteChannel channel = Files.newByteChannel(tempFile, StandardOpenOption.WRITE);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
@ -268,11 +313,33 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.expectError(IOException.class)
.verify();
.verify(Duration.ofSeconds(3));
channel.close();
}
@Test
public void writeWritableByteChannelCancel() throws Exception {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
Flux<DataBuffer> flux = Flux.just(foo, bar);
WritableByteChannel channel = Files.newByteChannel(tempFile, StandardOpenOption.WRITE);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
StepVerifier.create(writeResult, 1)
.consumeNextWith(stringConsumer("foo"))
.thenCancel()
.verify(Duration.ofSeconds(5));
String result = String.join("", Files.readAllLines(tempFile));
assertEquals("foo", result);
channel.close();
flux.subscribe(DataBufferUtils::release);
}
@Test
public void writeAsynchronousFileChannel() throws Exception {
DataBuffer foo = stringBuffer("foo");
@ -281,23 +348,26 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
DataBuffer qux = stringBuffer("qux");
Flux<DataBuffer> flux = Flux.just(foo, bar, baz, qux);
Path tempFile = Files.createTempFile("DataBufferUtilsTests", null);
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
verifyWrittenData(writeResult);
channel.close();
}
private void verifyWrittenData(Flux<DataBuffer> writeResult) throws IOException {
StepVerifier.create(writeResult)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.consumeNextWith(stringConsumer("baz"))
.consumeNextWith(stringConsumer("qux"))
.expectComplete()
.verify(Duration.ofSeconds(5));
.verify(Duration.ofSeconds(3));
String result = String.join("", Files.readAllLines(tempFile));
assertEquals("foobarbazqux", result);
channel.close();
}
@Test
@ -307,7 +377,6 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
Flux<DataBuffer> flux =
Flux.just(foo, bar).concatWith(Mono.error(new RuntimeException()));
Path tempFile = Files.createTempFile("DataBufferUtilsTests", null);
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);
@ -324,7 +393,6 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
channel.close();
}
@Test
public void writeAsynchronousFileChannelErrorInWrite() throws Exception {
DataBuffer foo = stringBuffer("foo");
@ -365,6 +433,29 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
channel.close();
}
@Test
public void writeAsynchronousFileChannelCanceled() throws Exception {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
Flux<DataBuffer> flux = Flux.just(foo, bar);
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
StepVerifier.create(writeResult, 1)
.consumeNextWith(stringConsumer("foo"))
.thenCancel()
.verify();
String result = String.join("", Files.readAllLines(tempFile));
assertEquals("foo", result);
channel.close();
flux.subscribe(DataBufferUtils::release);
}
@Test
public void readAndWriteByteChannel() throws Exception {
Path source = Paths.get(
@ -386,12 +477,14 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
String result = String.join("", Files.readAllLines(destination));
assertEquals(expected, result);
channel.close();
}
catch (IOException e) {
fail(e.getMessage());
}
finally {
DataBufferUtils.closeChannel(channel);
}
});
}
@ -418,13 +511,15 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
String result = String.join("", Files.readAllLines(destination));
assertEquals(expected, result);
channel.close();
latch.countDown();
}
catch (IOException e) {
fail(e.getMessage());
}
finally {
DataBufferUtils.closeChannel(channel);
}
});
latch.await();
@ -444,12 +539,28 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
}
@Test
public void takeUntilByteCountErrorInFlux() {
DataBuffer foo = stringBuffer("foo");
Flux<DataBuffer> flux =
Flux.just(foo).concatWith(Mono.error(new RuntimeException()));
public void takeUntilByteCountCanceled() {
Flux<DataBuffer> source = Flux.concat(
deferStringBuffer("foo"),
deferStringBuffer("bar")
);
Flux<DataBuffer> result = DataBufferUtils.takeUntilByteCount(
source, 5L);
Flux<DataBuffer> result = DataBufferUtils.takeUntilByteCount(flux, 5L);
StepVerifier.create(result)
.consumeNextWith(stringConsumer("foo"))
.thenCancel()
.verify(Duration.ofSeconds(5));
}
@Test
public void takeUntilByteCountError() {
Flux<DataBuffer> source = Flux.concat(
Mono.defer(() -> Mono.just(stringBuffer("foo"))),
Mono.error(new RuntimeException())
);
Flux<DataBuffer> result = DataBufferUtils.takeUntilByteCount(source, 5L);
StepVerifier.create(result)
.consumeNextWith(stringConsumer("foo"))
@ -459,28 +570,29 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
@Test
public void takeUntilByteCountExact() {
Flux<DataBuffer> source = Flux.concat(
deferStringBuffer("foo"),
deferStringBuffer("bar"),
deferStringBuffer("baz")
);
DataBuffer extraBuffer = stringBuffer("baz");
Flux<DataBuffer> result = DataBufferUtils.takeUntilByteCount(
Flux.just(stringBuffer("foo"), stringBuffer("bar"), extraBuffer), 6L);
Flux<DataBuffer> result = DataBufferUtils.takeUntilByteCount(source, 6L);
StepVerifier.create(result)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.expectComplete()
.verify(Duration.ofSeconds(5));
release(extraBuffer);
}
@Test
public void skipUntilByteCount() {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
DataBuffer baz = stringBuffer("baz");
Flux<DataBuffer> flux = Flux.just(foo, bar, baz);
Flux<DataBuffer> result = DataBufferUtils.skipUntilByteCount(flux, 5L);
Flux<DataBuffer> source = Flux.concat(
deferStringBuffer("foo"),
deferStringBuffer("bar"),
deferStringBuffer("baz")
);
Flux<DataBuffer> result = DataBufferUtils.skipUntilByteCount(source, 5L);
StepVerifier.create(result)
.consumeNextWith(stringConsumer("r"))
@ -489,6 +601,20 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
.verify(Duration.ofSeconds(5));
}
@Test
public void skipUntilByteCountCancelled() {
Flux<DataBuffer> source = Flux.concat(
deferStringBuffer("foo"),
deferStringBuffer("bar")
);
Flux<DataBuffer> result = DataBufferUtils.skipUntilByteCount(source, 5L);
StepVerifier.create(result)
.consumeNextWith(stringConsumer("r"))
.thenCancel()
.verify(Duration.ofSeconds(5));
}
@Test
public void skipUntilByteCountErrorInFlux() {
DataBuffer foo = stringBuffer("foo");
@ -510,7 +636,6 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
Flux<DataBuffer> result = DataBufferUtils.skipUntilByteCount(flux, 9L);
StepVerifier.create(result)
.expectNextCount(0)
.expectComplete()
.verify(Duration.ofSeconds(5));
}
@ -594,4 +719,20 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
.verify();
}
@Test
public void joinCanceled() {
Flux<DataBuffer> source = Flux.concat(
deferStringBuffer("foo"),
deferStringBuffer("bar"),
deferStringBuffer("baz")
);
Mono<DataBuffer> result = DataBufferUtils.join(source);
StepVerifier.create(result)
.thenCancel()
.verify();
}
}