Polishing

This commit is contained in:
Arjen Poutsma 2016-04-21 12:46:57 +02:00
parent df3cf69a96
commit 53f8b84bcc
1 changed files with 3 additions and 45 deletions

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.Enumeration;
import java.util.concurrent.atomic.AtomicLong;
@ -102,8 +103,8 @@ public abstract class DataBufferUtils {
Assert.notNull(inputStream, "'inputStream' must not be null");
Assert.notNull(allocator, "'allocator' must not be null");
return Flux.create(new InputStreamConsumer(allocator, bufferSize),
subscriber -> inputStream, closeConsumer());
ReadableByteChannel channel = Channels.newChannel(inputStream);
return read(channel, allocator, bufferSize);
}
@SuppressWarnings("unchecked")
@ -212,47 +213,4 @@ public abstract class DataBufferUtils {
}
}
private static class InputStreamConsumer
implements Consumer<SubscriberWithContext<DataBuffer, InputStream>> {
private final DataBufferAllocator allocator;
private final int chunkSize;
public InputStreamConsumer(DataBufferAllocator allocator, int chunkSize) {
this.allocator = allocator;
this.chunkSize = chunkSize;
}
@Override
public void accept(SubscriberWithContext<DataBuffer, InputStream> sub) {
try {
byte[] bytes = new byte[chunkSize];
int read;
InputStream is = sub.context();
if ((read = is.read(bytes)) > 0) {
boolean release = true;
DataBuffer dataBuffer = this.allocator.allocateBuffer(read);
try {
dataBuffer.write(bytes, 0, read);
release = false;
sub.onNext(dataBuffer);
}
finally {
if (release) {
// TODO: release buffer when we have PooledDataBuffer
}
}
}
else {
sub.onComplete();
}
}
catch (IOException ex) {
sub.onError(ex);
}
}
}
}