Support back pressure in DataBufferUtils::readAsynchronousFileChannel

This commit adds support for back pressure in the ReadCompletionHandler,
as used by DataBufferUtils::readAsynchronousFileChannel.

See gh-23518
This commit is contained in:
Arjen Poutsma 2019-08-29 16:07:22 +02:00
parent 4f4b9f6b1b
commit 2c5958e191
1 changed files with 43 additions and 24 deletions

View File

@ -139,10 +139,8 @@ public abstract class DataBufferUtils {
channel -> Flux.create(sink -> { channel -> Flux.create(sink -> {
ReadCompletionHandler handler = ReadCompletionHandler handler =
new ReadCompletionHandler(channel, sink, position, bufferFactory, bufferSize); new ReadCompletionHandler(channel, sink, position, bufferFactory, bufferSize);
sink.onDispose(handler::dispose); sink.onCancel(handler::cancel);
DataBuffer dataBuffer = bufferFactory.allocateBuffer(bufferSize); sink.onRequest(handler::request);
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
channel.read(byteBuffer, position, dataBuffer, handler);
}), }),
channel -> { channel -> {
// Do not close channel from here, rather wait for the current read callback // Do not close channel from here, rather wait for the current read callback
@ -506,7 +504,9 @@ public abstract class DataBufferUtils {
private final AtomicLong position; private final AtomicLong position;
private final AtomicBoolean disposed = new AtomicBoolean(); private final AtomicBoolean reading = new AtomicBoolean();
private final AtomicBoolean canceled = new AtomicBoolean();
public ReadCompletionHandler(AsynchronousFileChannel channel, public ReadCompletionHandler(AsynchronousFileChannel channel,
FluxSink<DataBuffer> sink, long position, DataBufferFactory dataBufferFactory, int bufferSize) { FluxSink<DataBuffer> sink, long position, DataBufferFactory dataBufferFactory, int bufferSize) {
@ -518,43 +518,62 @@ public abstract class DataBufferUtils {
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
} }
public void read() {
if (this.sink.requestedFromDownstream() > 0 && this.reading.compareAndSet(false, true)) {
DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, this.bufferSize);
this.channel.read(byteBuffer, this.position.get(), dataBuffer, this);
}
}
@Override @Override
public void completed(Integer read, DataBuffer dataBuffer) { public void completed(Integer read, DataBuffer dataBuffer) {
if (read != -1 && !this.disposed.get()) { this.reading.set(false);
long pos = this.position.addAndGet(read); if (!isCanceled()) {
dataBuffer.writePosition(read); if (read != -1) {
this.sink.next(dataBuffer); this.position.addAndGet(read);
// onNext may have led to onCancel (e.g. downstream takeUntil) dataBuffer.writePosition(read);
if (this.disposed.get()) { this.sink.next(dataBuffer);
complete(); read();
} }
else { else {
DataBuffer newDataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); release(dataBuffer);
ByteBuffer newByteBuffer = newDataBuffer.asByteBuffer(0, this.bufferSize); closeChannel(this.channel);
this.channel.read(newByteBuffer, pos, newDataBuffer, this); this.sink.complete();
} }
} }
else { else {
release(dataBuffer); release(dataBuffer);
complete(); closeChannel(this.channel);
} }
} }
private void complete() {
this.sink.complete();
closeChannel(this.channel);
}
@Override @Override
public void failed(Throwable exc, DataBuffer dataBuffer) { public void failed(Throwable exc, DataBuffer dataBuffer) {
this.reading.set(false);
release(dataBuffer); release(dataBuffer);
this.sink.error(exc);
closeChannel(this.channel); closeChannel(this.channel);
if (!isCanceled()) {
this.sink.error(exc);
}
} }
public void dispose() { public void request(long n) {
this.disposed.set(true); read();
} }
public void cancel() {
if (this.canceled.compareAndSet(false, true)) {
if (!this.reading.get()) {
closeChannel(this.channel);
}
}
}
private boolean isCanceled() {
return this.canceled.get();
}
} }