diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index af9015db6d5..359cf189c4b 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -69,28 +69,29 @@ public abstract class DataBufferUtils { //--------------------------------------------------------------------- /** - * Obtain a {@link InputStream} from the given supplier, and read it into a {@code Flux} - * of {@code DataBuffer}s. Closes the input stream when the flux is terminated. + * Obtain a {@link InputStream} from the given supplier, and read it into a + * {@code Flux} of {@code DataBuffer}s. Closes the input stream when the + * Flux is terminated. * @param inputStreamSupplier the supplier for the input stream to read from - * @param dataBufferFactory the factory to create data buffers with + * @param bufferFactory the factory to create data buffers with * @param bufferSize the maximum size of the data buffers - * @return a flux of data buffers read from the given channel + * @return a Flux of data buffers read from the given channel */ public static Flux readInputStream( - Callable inputStreamSupplier, DataBufferFactory dataBufferFactory, int bufferSize) { + Callable inputStreamSupplier, DataBufferFactory bufferFactory, int bufferSize) { Assert.notNull(inputStreamSupplier, "'inputStreamSupplier' must not be null"); - - return readByteChannel(() -> Channels.newChannel(inputStreamSupplier.call()), dataBufferFactory, bufferSize); + return readByteChannel(() -> Channels.newChannel(inputStreamSupplier.call()), bufferFactory, bufferSize); } /** - * Obtain a {@link ReadableByteChannel} from the given supplier, and read it into a - * {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated. + * Obtain a {@link ReadableByteChannel} from the given supplier, and read + * it into a {@code Flux} of {@code DataBuffer}s. Closes the channel when + * the Flux is terminated. * @param channelSupplier the supplier for the channel to read from * @param bufferFactory the factory to create data buffers with * @param bufferSize the maximum size of the data buffers - * @return a flux of data buffers read from the given channel + * @return a Flux of data buffers read from the given channel */ public static Flux readByteChannel( Callable channelSupplier, DataBufferFactory bufferFactory, int bufferSize) { @@ -107,12 +108,13 @@ public abstract class DataBufferUtils { } /** - * Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a - * {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated. + * Obtain a {@code AsynchronousFileChannel} from the given supplier, and read + * it into a {@code Flux} of {@code DataBuffer}s. Closes the channel when + * the Flux is terminated. * @param channelSupplier the supplier for the channel to read from * @param bufferFactory the factory to create data buffers with * @param bufferSize the maximum size of the data buffers - * @return a flux of data buffers read from the given channel + * @return a Flux of data buffers read from the given channel */ public static Flux readAsynchronousFileChannel( Callable channelSupplier, DataBufferFactory bufferFactory, int bufferSize) { @@ -121,17 +123,18 @@ public abstract class DataBufferUtils { } /** - * Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a - * {@code Flux} of {@code DataBuffer}s, starting at the given position. Closes the - * channel when the flux is terminated. + * Obtain a {@code AsynchronousFileChannel} from the given supplier, and + * read it into a {@code Flux} of {@code DataBuffer}s, starting at the given + * position. Closes the channel when the Flux is terminated. * @param channelSupplier the supplier for the channel to read from * @param position the position to start reading from * @param bufferFactory the factory to create data buffers with * @param bufferSize the maximum size of the data buffers - * @return a flux of data buffers read from the given channel + * @return a Flux of data buffers read from the given channel */ - public static Flux readAsynchronousFileChannel(Callable channelSupplier, - long position, DataBufferFactory bufferFactory, int bufferSize) { + public static Flux readAsynchronousFileChannel( + Callable channelSupplier, long position, + DataBufferFactory bufferFactory, int bufferSize) { Assert.notNull(channelSupplier, "'channelSupplier' must not be null"); Assert.notNull(bufferFactory, "'dataBufferFactory' must not be null"); @@ -165,14 +168,12 @@ public abstract class DataBufferUtils { * fall back to {@link #readByteChannel(Callable, DataBufferFactory, int)}. * Closes the channel when the flux is terminated. * @param resource the resource to read from - * @param dataBufferFactory the factory to create data buffers with + * @param bufferFactory the factory to create data buffers with * @param bufferSize the maximum size of the data buffers - * @return a flux of data buffers read from the given channel + * @return a Flux of data buffers read from the given channel */ - public static Flux read( - Resource resource, DataBufferFactory dataBufferFactory, int bufferSize) { - - return read(resource, 0, dataBufferFactory, bufferSize); + public static Flux read(Resource resource, DataBufferFactory bufferFactory, int bufferSize) { + return read(resource, 0, bufferFactory, bufferSize); } /** @@ -185,26 +186,25 @@ public abstract class DataBufferUtils { * Closes the channel when the flux is terminated. * @param resource the resource to read from * @param position the position to start reading from - * @param dataBufferFactory the factory to create data buffers with + * @param bufferFactory the factory to create data buffers with * @param bufferSize the maximum size of the data buffers - * @return a flux of data buffers read from the given channel + * @return a Flux of data buffers read from the given channel */ public static Flux read( - Resource resource, long position, DataBufferFactory dataBufferFactory, int bufferSize) { + Resource resource, long position, DataBufferFactory bufferFactory, int bufferSize) { try { if (resource.isFile()) { File file = resource.getFile(); return readAsynchronousFileChannel( () -> AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ), - position, dataBufferFactory, bufferSize); + position, bufferFactory, bufferSize); } } catch (IOException ignore) { // fallback to resource.readableChannel(), below } - - Flux result = readByteChannel(resource::readableChannel, dataBufferFactory, bufferSize); + Flux result = readByteChannel(resource::readableChannel, bufferFactory, bufferSize); return position == 0 ? result : skipUntilByteCount(result, position); } @@ -214,16 +214,19 @@ public abstract class DataBufferUtils { //--------------------------------------------------------------------- /** - * Write the given stream of {@link DataBuffer DataBuffers} to the given {@code OutputStream}. Does - * not close the output stream when the flux is terminated, and does - * not {@linkplain #release(DataBuffer) release} the data buffers in the - * source. If releasing is required, then subscribe to the returned {@code Flux} with a - * {@link #releaseConsumer()}. - *

Note that the writing process does not start until the returned {@code Flux} is subscribed to. + * Write the given stream of {@link DataBuffer DataBuffers} to the given + * {@code OutputStream}. Does not close the output stream + * when the flux is terminated, and does not + * {@linkplain #release(DataBuffer) release} the data buffers in the source. + * If releasing is required, then subscribe to the returned {@code Flux} + * with a {@link #releaseConsumer()}. + *

Note that the writing process does not start until the returned + * {@code Flux} is subscribed to. * @param source the stream of data buffers to be written * @param outputStream the output stream to write to - * @return a flux containing the same buffers as in {@code source}, that starts the writing - * process when subscribed to, and that publishes any writing errors and the completion signal + * @return a Flux containing the same buffers as in {@code source}, that + * starts the writing process when subscribed to, and that publishes any + * writing errors and the completion signal */ public static Flux write(Publisher source, OutputStream outputStream) { Assert.notNull(source, "'source' must not be null"); @@ -234,16 +237,19 @@ public abstract class DataBufferUtils { } /** - * Write the given stream of {@link DataBuffer DataBuffers} to the given {@code WritableByteChannel}. Does - * not close the channel when the flux is terminated, and does - * not {@linkplain #release(DataBuffer) release} the data buffers in the - * source. If releasing is required, then subscribe to the returned {@code Flux} with a - * {@link #releaseConsumer()}. - *

Note that the writing process does not start until the returned {@code Flux} is subscribed to. + * Write the given stream of {@link DataBuffer DataBuffers} to the given + * {@code WritableByteChannel}. Does not close the channel + * when the flux is terminated, and does not + * {@linkplain #release(DataBuffer) release} the data buffers in the source. + * If releasing is required, then subscribe to the returned {@code Flux} + * with a {@link #releaseConsumer()}. + *

Note that the writing process does not start until the returned + * {@code Flux} is subscribed to. * @param source the stream of data buffers to be written * @param channel the channel to write to - * @return a flux containing the same buffers as in {@code source}, that starts the writing - * process when subscribed to, and that publishes any writing errors and the completion signal + * @return a Flux containing the same buffers as in {@code source}, that + * starts the writing process when subscribed to, and that publishes any + * writing errors and the completion signal */ public static Flux write(Publisher source, WritableByteChannel channel) { Assert.notNull(source, "'source' must not be null"); @@ -258,16 +264,19 @@ public abstract class DataBufferUtils { } /** - * Write the given stream of {@link DataBuffer DataBuffers} to the given {@code AsynchronousFileChannel}. - * Does not close the channel when the flux is terminated, and does - * not {@linkplain #release(DataBuffer) release} the data buffers in the - * source. If releasing is required, then subscribe to the returned {@code Flux} with a - * {@link #releaseConsumer()}. - *

Note that the writing process does not start until the returned {@code Flux} is subscribed to. + * Write the given stream of {@link DataBuffer DataBuffers} to the given + * {@code AsynchronousFileChannel}. Does not close the + * channel when the flux is terminated, and does not + * {@linkplain #release(DataBuffer) release} the data buffers in the source. + * If releasing is required, then subscribe to the returned {@code Flux} + * with a {@link #releaseConsumer()}. + *

Note that the writing process does not start until the returned + * {@code Flux} is subscribed to. * @param source the stream of data buffers to be written * @param channel the channel to write to - * @return a flux containing the same buffers as in {@code source}, that starts the writing - * process when subscribed to, and that publishes any writing errors and the completion signal + * @return a Flux containing the same buffers as in {@code source}, that + * starts the writing process when subscribed to, and that publishes any + * writing errors and the completion signal * @since 5.0.10 */ public static Flux write(Publisher source, AsynchronousFileChannel channel) { @@ -275,17 +284,20 @@ public abstract class DataBufferUtils { } /** - * Write the given stream of {@link DataBuffer DataBuffers} to the given {@code AsynchronousFileChannel}. - * Does not close the channel when the flux is terminated, and does - * not {@linkplain #release(DataBuffer) release} the data buffers in the - * source. If releasing is required, then subscribe to the returned {@code Flux} with a + * Write the given stream of {@link DataBuffer DataBuffers} to the given + * {@code AsynchronousFileChannel}. Does not close the channel + * when the flux is terminated, and does not + * {@linkplain #release(DataBuffer) release} the data buffers in the source. + * If releasing is required, then subscribe to the returned {@code Flux} with a * {@link #releaseConsumer()}. - *

Note that the writing process does not start until the returned {@code Flux} is subscribed to. + *

Note that the writing process does not start until the returned + * {@code Flux} is subscribed to. * @param source the stream of data buffers to be written * @param channel the channel to write to - * @param position the file position at which the write is to begin; must be non-negative - * @return a flux containing the same buffers as in {@code source}, that starts the writing - * process when subscribed to, and that publishes any writing errors and the completion signal + * @param position file position write write is to begin; must be non-negative + * @return a flux containing the same buffers as in {@code source}, that + * starts the writing process when subscribed to, and that publishes any + * writing errors and the completion signal */ public static Flux write( Publisher source, AsynchronousFileChannel channel, long position) {