Change write methods to return Flux<DataBuffer>

This commit changes the write methods to return `Flux<DataBuffer>`
instead of `Mono<Void>`, giving access to the original buffers,
so that they can decided whether the buffers need to be closed or not.

Issue: SPR-15726
This commit is contained in:
Arjen Poutsma 2017-07-14 18:55:04 +02:00
parent efc5b47b9a
commit c802827f0f
2 changed files with 104 additions and 36 deletions

View File

@ -26,16 +26,16 @@ import java.nio.channels.Channels;
import java.nio.channels.CompletionHandler;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.SynchronousSink;
import org.springframework.lang.Nullable;
@ -50,6 +50,12 @@ import org.springframework.util.Assert;
*/
public abstract class DataBufferUtils {
private static final Consumer<DataBuffer> RELEASE_CONSUMER = DataBufferUtils::release;
//---------------------------------------------------------------------
// Reading
//---------------------------------------------------------------------
/**
* Read the given {@code InputStream} into a {@code Flux} of
* {@code DataBuffer}s. Closes the input stream when the flux is terminated.
@ -128,19 +134,24 @@ public abstract class DataBufferUtils {
});
}
//---------------------------------------------------------------------
// Writing
//---------------------------------------------------------------------
/**
* Write the given stream of {@link DataBuffer}s to the given {@code OutputStream}. Does
* <strong>not</strong> close the output stream when the flux is terminated, but
* <strong>does</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
* source.
* <p>Note that the writing process does not start until the returned {@code Mono} is subscribed
* <strong>not</strong> close the output stream when the flux is terminated, and does
* <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
* source. If releasing is required, then subscribe to the returned {@code Flux} with a
* {@link #releaseConsumer()}.
* <p>Note that the writing process does not start until the returned {@code Flux} is subscribed
* to.
* @param source the stream of data buffers to be written
* @param outputStream the output stream to write to
* @return a mono that starts the writing process when subscribed to, and that indicates the
* completion of the process
* @return a flux containing the same buffers as in {@code source}, that starts the writing
* process when subscribed to, and that publishes any writing errors and the completion signal
*/
public static Mono<Void> write(Publisher<DataBuffer> source,
public static Flux<DataBuffer> write(Publisher<DataBuffer> source,
OutputStream outputStream) {
Assert.notNull(source, "'source' must not be null");
@ -152,17 +163,18 @@ public abstract class DataBufferUtils {
/**
* Write the given stream of {@link DataBuffer}s to the given {@code WritableByteChannel}. Does
* <strong>not</strong> close the channel when the flux is terminated, but
* <strong>does</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
* source.
* <p>Note that the writing process does not start until the returned {@code Mono} is subscribed
* <strong>not</strong> close the channel when the flux is terminated, and does
* <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
* source. If releasing is required, then subscribe to the returned {@code Flux} with a
* {@link #releaseConsumer()}.
* <p>Note that the writing process does not start until the returned {@code Flux} is subscribed
* to.
* @param source the stream of data buffers to be written
* @param channel the channel to write to
* @return a mono that starts the writing process when subscribed to, and that indicates the
* completion of the process
* @return a flux containing the same buffers as in {@code source}, that starts the writing
* process when subscribed to, and that publishes any writing errors and the completion signal
*/
public static Mono<Void> write(Publisher<DataBuffer> source,
public static Flux<DataBuffer> write(Publisher<DataBuffer> source,
WritableByteChannel channel) {
Assert.notNull(source, "'source' must not be null");
@ -170,14 +182,14 @@ public abstract class DataBufferUtils {
Flux<DataBuffer> flux = Flux.from(source);
return Mono.create(sink ->
return Flux.create(sink ->
flux.subscribe(dataBuffer -> {
try {
ByteBuffer byteBuffer = dataBuffer.asByteBuffer();
while (byteBuffer.hasRemaining()) {
channel.write(byteBuffer);
}
release(dataBuffer);
sink.next(dataBuffer);
}
catch (IOException ex) {
sink.error(ex);
@ -185,22 +197,23 @@ public abstract class DataBufferUtils {
},
sink::error,
sink::success));
sink::complete));
}
/**
* Write the given stream of {@link DataBuffer}s to the given {@code AsynchronousFileChannel}.
* Does <strong>not</strong> close the channel when the flux is terminated, but
* <strong>does</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
* source.
* <p>Note that the writing process does not start until the returned {@code Mono} is subscribed
* Does <strong>not</strong> close the channel when the flux is terminated, and does
* <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
* source. If releasing is required, then subscribe to the returned {@code Flux} with a
* {@link #releaseConsumer()}.
* <p>Note that the writing process does not start until the returned {@code Flux} is subscribed
* to.
* @param source the stream of data buffers to be written
* @param channel the channel to write to
* @return a mono that starts the writing process when subscribed to, and that indicates the
* completion of the process
* @return a flux containing the same buffers as in {@code source}, that starts the writing
* process when subscribed to, and that publishes any writing errors and the completion signal
*/
public static Mono<Void> write(Publisher<DataBuffer> source, AsynchronousFileChannel channel,
public static Flux<DataBuffer> write(Publisher<DataBuffer> source, AsynchronousFileChannel channel,
long position) {
Assert.notNull(source, "'source' must not be null");
@ -209,7 +222,7 @@ public abstract class DataBufferUtils {
Flux<DataBuffer> flux = Flux.from(source);
return Mono.create(sink -> {
return Flux.create(sink -> {
BaseSubscriber<DataBuffer> subscriber =
new AsynchronousFileChannelWriteCompletionHandler(sink, channel, position);
flux.subscribe(subscriber);
@ -259,6 +272,10 @@ public abstract class DataBufferUtils {
});
}
//---------------------------------------------------------------------
// Various
//---------------------------------------------------------------------
/**
* Skip buffers from the given {@link Publisher} until the total
* {@linkplain DataBuffer#readableByteCount() byte count} reaches
@ -322,6 +339,14 @@ public abstract class DataBufferUtils {
return false;
}
/**
* Returns a consumer that calls {@link #release(DataBuffer)} on all
* passed data buffers.
*/
public static Consumer<DataBuffer> releaseConsumer() {
return RELEASE_CONSUMER;
}
private static class ReadableByteChannelGenerator
implements BiFunction<ReadableByteChannel, SynchronousSink<DataBuffer>, ReadableByteChannel> {
@ -425,17 +450,19 @@ public abstract class DataBufferUtils {
extends BaseSubscriber<DataBuffer>
implements CompletionHandler<Integer, ByteBuffer> {
private final MonoSink<Void> sink;
private final FluxSink<DataBuffer> sink;
private final AsynchronousFileChannel channel;
private final AtomicBoolean completed = new AtomicBoolean();
private long position;
@Nullable
private DataBuffer dataBuffer;
public AsynchronousFileChannelWriteCompletionHandler(
MonoSink<Void> sink, AsynchronousFileChannel channel, long position) {
FluxSink<DataBuffer> sink, AsynchronousFileChannel channel, long position) {
this.sink = sink;
this.channel = channel;
this.position = position;
@ -461,7 +488,7 @@ public abstract class DataBufferUtils {
@Override
protected void hookOnComplete() {
this.sink.success();
this.completed.set(true);
}
@Override
@ -469,12 +496,17 @@ public abstract class DataBufferUtils {
this.position += written;
if (byteBuffer.hasRemaining()) {
this.channel.write(byteBuffer, this.position, byteBuffer, this);
return;
}
else if (this.dataBuffer != null) {
this.sink.next(this.dataBuffer);
}
if (this.completed.get()) {
this.sink.complete();
}
else {
release(this.dataBuffer);
request(1);
}
}
@Override

View File

@ -16,6 +16,7 @@
package org.springframework.core.io.buffer;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
@ -30,7 +31,6 @@ import java.util.stream.Collectors;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import static org.junit.Assert.*;
@ -129,8 +129,12 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
Path tempFile = Files.createTempFile("DataBufferUtilsTests", null);
OutputStream os = Files.newOutputStream(tempFile);
Mono<Void> writeResult = DataBufferUtils.write(flux, os);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, os);
StepVerifier.create(writeResult)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.consumeNextWith(stringConsumer("baz"))
.consumeNextWith(stringConsumer("qux"))
.expectComplete()
.verify();
@ -153,8 +157,12 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
Path tempFile = Files.createTempFile("DataBufferUtilsTests", null);
WritableByteChannel channel = Files.newByteChannel(tempFile, StandardOpenOption.WRITE);
Mono<Void> writeResult = DataBufferUtils.write(flux, channel);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
StepVerifier.create(writeResult)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.consumeNextWith(stringConsumer("baz"))
.consumeNextWith(stringConsumer("qux"))
.expectComplete()
.verify();
@ -178,8 +186,12 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);
Mono<Void> writeResult = DataBufferUtils.write(flux, channel, 0);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel, 0);
StepVerifier.create(writeResult)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.consumeNextWith(stringConsumer("baz"))
.consumeNextWith(stringConsumer("qux"))
.expectComplete()
.verify();
@ -236,4 +248,28 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
.verify();
}
@Test
public void releaseConsumer() {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
DataBuffer baz = stringBuffer("baz");
Flux<DataBuffer> flux = Flux.just(foo, bar, baz);
flux.subscribe(DataBufferUtils.releaseConsumer());
// AbstractDataBufferAllocatingTestCase.LeakDetector will assert the release of the buffers
}
public void foo() {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
DataBuffer baz = stringBuffer("baz");
Flux<DataBuffer> flux = Flux.just(foo, bar, baz);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataBufferUtils.write(flux, bos)
.subscribe(DataBufferUtils.releaseConsumer());
}
}