From c802827f0f4bd9ef18658645492b7bcc653fd2b6 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Fri, 14 Jul 2017 18:55:04 +0200 Subject: [PATCH] Change write methods to return Flux This commit changes the write methods to return `Flux` instead of `Mono`, giving access to the original buffers, so that they can decided whether the buffers need to be closed or not. Issue: SPR-15726 --- .../core/io/buffer/DataBufferUtils.java | 96 ++++++++++++------- .../core/io/buffer/DataBufferUtilsTests.java | 44 ++++++++- 2 files changed, 104 insertions(+), 36 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 9fd809fe28c..6d47cfeed64 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -26,16 +26,16 @@ import java.nio.channels.Channels; import java.nio.channels.CompletionHandler; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; +import java.util.function.Consumer; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; -import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoSink; import reactor.core.publisher.SynchronousSink; import org.springframework.lang.Nullable; @@ -50,6 +50,12 @@ import org.springframework.util.Assert; */ public abstract class DataBufferUtils { + private static final Consumer RELEASE_CONSUMER = DataBufferUtils::release; + + //--------------------------------------------------------------------- + // Reading + //--------------------------------------------------------------------- + /** * Read the given {@code InputStream} into a {@code Flux} of * {@code DataBuffer}s. Closes the input stream when the flux is terminated. @@ -128,19 +134,24 @@ public abstract class DataBufferUtils { }); } + //--------------------------------------------------------------------- + // Writing + //--------------------------------------------------------------------- + /** * Write the given stream of {@link DataBuffer}s to the given {@code OutputStream}. Does - * not close the output stream when the flux is terminated, but - * does {@linkplain #release(DataBuffer) release} the data buffers in the - * source. - *

Note that the writing process does not start until the returned {@code Mono} is subscribed + * not close the output stream when the flux is terminated, and does + * not {@linkplain #release(DataBuffer) release} the data buffers in the + * source. If releasing is required, then subscribe to the returned {@code Flux} with a + * {@link #releaseConsumer()}. + *

Note that the writing process does not start until the returned {@code Flux} is subscribed * to. * @param source the stream of data buffers to be written * @param outputStream the output stream to write to - * @return a mono that starts the writing process when subscribed to, and that indicates the - * completion of the process + * @return a flux containing the same buffers as in {@code source}, that starts the writing + * process when subscribed to, and that publishes any writing errors and the completion signal */ - public static Mono write(Publisher source, + public static Flux write(Publisher source, OutputStream outputStream) { Assert.notNull(source, "'source' must not be null"); @@ -152,17 +163,18 @@ public abstract class DataBufferUtils { /** * Write the given stream of {@link DataBuffer}s to the given {@code WritableByteChannel}. Does - * not close the channel when the flux is terminated, but - * does {@linkplain #release(DataBuffer) release} the data buffers in the - * source. - *

Note that the writing process does not start until the returned {@code Mono} is subscribed + * not close the channel when the flux is terminated, and does + * not {@linkplain #release(DataBuffer) release} the data buffers in the + * source. If releasing is required, then subscribe to the returned {@code Flux} with a + * {@link #releaseConsumer()}. + *

Note that the writing process does not start until the returned {@code Flux} is subscribed * to. * @param source the stream of data buffers to be written * @param channel the channel to write to - * @return a mono that starts the writing process when subscribed to, and that indicates the - * completion of the process + * @return a flux containing the same buffers as in {@code source}, that starts the writing + * process when subscribed to, and that publishes any writing errors and the completion signal */ - public static Mono write(Publisher source, + public static Flux write(Publisher source, WritableByteChannel channel) { Assert.notNull(source, "'source' must not be null"); @@ -170,14 +182,14 @@ public abstract class DataBufferUtils { Flux flux = Flux.from(source); - return Mono.create(sink -> + return Flux.create(sink -> flux.subscribe(dataBuffer -> { try { ByteBuffer byteBuffer = dataBuffer.asByteBuffer(); while (byteBuffer.hasRemaining()) { channel.write(byteBuffer); } - release(dataBuffer); + sink.next(dataBuffer); } catch (IOException ex) { sink.error(ex); @@ -185,22 +197,23 @@ public abstract class DataBufferUtils { }, sink::error, - sink::success)); + sink::complete)); } /** * Write the given stream of {@link DataBuffer}s to the given {@code AsynchronousFileChannel}. - * Does not close the channel when the flux is terminated, but - * does {@linkplain #release(DataBuffer) release} the data buffers in the - * source. - *

Note that the writing process does not start until the returned {@code Mono} is subscribed + * Does not close the channel when the flux is terminated, and does + * not {@linkplain #release(DataBuffer) release} the data buffers in the + * source. If releasing is required, then subscribe to the returned {@code Flux} with a + * {@link #releaseConsumer()}. + *

Note that the writing process does not start until the returned {@code Flux} is subscribed * to. * @param source the stream of data buffers to be written * @param channel the channel to write to - * @return a mono that starts the writing process when subscribed to, and that indicates the - * completion of the process + * @return a flux containing the same buffers as in {@code source}, that starts the writing + * process when subscribed to, and that publishes any writing errors and the completion signal */ - public static Mono write(Publisher source, AsynchronousFileChannel channel, + public static Flux write(Publisher source, AsynchronousFileChannel channel, long position) { Assert.notNull(source, "'source' must not be null"); @@ -209,7 +222,7 @@ public abstract class DataBufferUtils { Flux flux = Flux.from(source); - return Mono.create(sink -> { + return Flux.create(sink -> { BaseSubscriber subscriber = new AsynchronousFileChannelWriteCompletionHandler(sink, channel, position); flux.subscribe(subscriber); @@ -259,6 +272,10 @@ public abstract class DataBufferUtils { }); } + //--------------------------------------------------------------------- + // Various + //--------------------------------------------------------------------- + /** * Skip buffers from the given {@link Publisher} until the total * {@linkplain DataBuffer#readableByteCount() byte count} reaches @@ -322,6 +339,14 @@ public abstract class DataBufferUtils { return false; } + /** + * Returns a consumer that calls {@link #release(DataBuffer)} on all + * passed data buffers. + */ + public static Consumer releaseConsumer() { + return RELEASE_CONSUMER; + } + private static class ReadableByteChannelGenerator implements BiFunction, ReadableByteChannel> { @@ -425,17 +450,19 @@ public abstract class DataBufferUtils { extends BaseSubscriber implements CompletionHandler { - private final MonoSink sink; + private final FluxSink sink; private final AsynchronousFileChannel channel; + private final AtomicBoolean completed = new AtomicBoolean(); + private long position; @Nullable private DataBuffer dataBuffer; public AsynchronousFileChannelWriteCompletionHandler( - MonoSink sink, AsynchronousFileChannel channel, long position) { + FluxSink sink, AsynchronousFileChannel channel, long position) { this.sink = sink; this.channel = channel; this.position = position; @@ -461,7 +488,7 @@ public abstract class DataBufferUtils { @Override protected void hookOnComplete() { - this.sink.success(); + this.completed.set(true); } @Override @@ -469,12 +496,17 @@ public abstract class DataBufferUtils { this.position += written; if (byteBuffer.hasRemaining()) { this.channel.write(byteBuffer, this.position, byteBuffer, this); + return; + } + else if (this.dataBuffer != null) { + this.sink.next(this.dataBuffer); + } + if (this.completed.get()) { + this.sink.complete(); } else { - release(this.dataBuffer); request(1); } - } @Override diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index c56cbe6fb10..953708dd677 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -16,6 +16,7 @@ package org.springframework.core.io.buffer; +import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; @@ -30,7 +31,6 @@ import java.util.stream.Collectors; import org.junit.Test; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import static org.junit.Assert.*; @@ -129,8 +129,12 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { Path tempFile = Files.createTempFile("DataBufferUtilsTests", null); OutputStream os = Files.newOutputStream(tempFile); - Mono writeResult = DataBufferUtils.write(flux, os); + Flux writeResult = DataBufferUtils.write(flux, os); StepVerifier.create(writeResult) + .consumeNextWith(stringConsumer("foo")) + .consumeNextWith(stringConsumer("bar")) + .consumeNextWith(stringConsumer("baz")) + .consumeNextWith(stringConsumer("qux")) .expectComplete() .verify(); @@ -153,8 +157,12 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { Path tempFile = Files.createTempFile("DataBufferUtilsTests", null); WritableByteChannel channel = Files.newByteChannel(tempFile, StandardOpenOption.WRITE); - Mono writeResult = DataBufferUtils.write(flux, channel); + Flux writeResult = DataBufferUtils.write(flux, channel); StepVerifier.create(writeResult) + .consumeNextWith(stringConsumer("foo")) + .consumeNextWith(stringConsumer("bar")) + .consumeNextWith(stringConsumer("baz")) + .consumeNextWith(stringConsumer("qux")) .expectComplete() .verify(); @@ -178,8 +186,12 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { AsynchronousFileChannel channel = AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE); - Mono writeResult = DataBufferUtils.write(flux, channel, 0); + Flux writeResult = DataBufferUtils.write(flux, channel, 0); StepVerifier.create(writeResult) + .consumeNextWith(stringConsumer("foo")) + .consumeNextWith(stringConsumer("bar")) + .consumeNextWith(stringConsumer("baz")) + .consumeNextWith(stringConsumer("qux")) .expectComplete() .verify(); @@ -236,4 +248,28 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { .verify(); } + @Test + public void releaseConsumer() { + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + DataBuffer baz = stringBuffer("baz"); + Flux flux = Flux.just(foo, bar, baz); + + flux.subscribe(DataBufferUtils.releaseConsumer()); + + // AbstractDataBufferAllocatingTestCase.LeakDetector will assert the release of the buffers + } + + public void foo() { + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + DataBuffer baz = stringBuffer("baz"); + Flux flux = Flux.just(foo, bar, baz); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataBufferUtils.write(flux, bos) + .subscribe(DataBufferUtils.releaseConsumer()); + + + } + }