diff --git a/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java b/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java index 9e495aa3aaf..a8c304a1506 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.SequenceInputStream; import java.nio.ByteBuffer; +import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.util.Enumeration; import java.util.concurrent.atomic.AtomicLong; @@ -102,8 +103,8 @@ public abstract class DataBufferUtils { Assert.notNull(inputStream, "'inputStream' must not be null"); Assert.notNull(allocator, "'allocator' must not be null"); - return Flux.create(new InputStreamConsumer(allocator, bufferSize), - subscriber -> inputStream, closeConsumer()); + ReadableByteChannel channel = Channels.newChannel(inputStream); + return read(channel, allocator, bufferSize); } @SuppressWarnings("unchecked") @@ -212,47 +213,4 @@ public abstract class DataBufferUtils { } } - private static class InputStreamConsumer - implements Consumer> { - - private final DataBufferAllocator allocator; - - private final int chunkSize; - - public InputStreamConsumer(DataBufferAllocator allocator, int chunkSize) { - this.allocator = allocator; - this.chunkSize = chunkSize; - } - - @Override - public void accept(SubscriberWithContext sub) { - try { - byte[] bytes = new byte[chunkSize]; - int read; - InputStream is = sub.context(); - if ((read = is.read(bytes)) > 0) { - boolean release = true; - DataBuffer dataBuffer = this.allocator.allocateBuffer(read); - try { - dataBuffer.write(bytes, 0, read); - release = false; - sub.onNext(dataBuffer); - } - finally { - if (release) { - // TODO: release buffer when we have PooledDataBuffer - } - } - } - else { - sub.onComplete(); - } - } - catch (IOException ex) { - sub.onError(ex); - } - - } - } - }