diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index cfedb647470..24a90223ad3 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -683,9 +683,7 @@ public abstract class DataBufferUtils { private final AtomicLong position; - private final AtomicBoolean reading = new AtomicBoolean(); - - private final AtomicBoolean disposed = new AtomicBoolean(); + private final AtomicReference state = new AtomicReference<>(State.IDLE); public ReadCompletionHandler(AsynchronousFileChannel channel, FluxSink sink, long position, DataBufferFactory dataBufferFactory, int bufferSize) { @@ -697,39 +695,68 @@ public abstract class DataBufferUtils { this.bufferSize = bufferSize; } - public void read() { - if (this.sink.requestedFromDownstream() > 0 && - isNotDisposed() && - this.reading.compareAndSet(false, true)) { - DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); - ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, this.bufferSize); - this.channel.read(byteBuffer, this.position.get(), dataBuffer, this); + /** + * Invoked when Reactive Streams consumer signals demand. + */ + public void request(long n) { + tryRead(); + } + + /** + * Invoked when Reactive Streams consumer cancels. + */ + public void cancel() { + this.state.getAndSet(State.DISPOSED); + + // According java.nio.channels.AsynchronousChannel "if an I/O operation is outstanding + // on the channel and the channel's close method is invoked, then the I/O operation + // fails with the exception AsynchronousCloseException". That should invoke the failed + // callback below which and the current DataBuffer should be released. + + closeChannel(this.channel); + } + + private void tryRead() { + if (this.sink.requestedFromDownstream() > 0 && this.state.compareAndSet(State.IDLE, State.READING)) { + read(); } } + private void read() { + DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); + ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, this.bufferSize); + this.channel.read(byteBuffer, this.position.get(), dataBuffer, this); + } + @Override public void completed(Integer read, DataBuffer dataBuffer) { - if (isNotDisposed()) { - if (read != -1) { - this.position.addAndGet(read); - dataBuffer.writePosition(read); - this.sink.next(dataBuffer); - this.reading.set(false); - read(); - } - else { - release(dataBuffer); - closeChannel(this.channel); - if (this.disposed.compareAndSet(false, true)) { - this.sink.complete(); - } - this.reading.set(false); - } - } - else { + if (this.state.get().equals(State.DISPOSED)) { release(dataBuffer); closeChannel(this.channel); - this.reading.set(false); + return; + } + + if (read == -1) { + release(dataBuffer); + closeChannel(this.channel); + this.state.set(State.DISPOSED); + this.sink.complete(); + return; + } + + this.position.addAndGet(read); + dataBuffer.writePosition(read); + this.sink.next(dataBuffer); + + // Stay in READING mode if there is demand + if (this.sink.requestedFromDownstream() > 0) { + read(); + return; + } + + // Release READING mode and then try again in case of concurrent "request" + if (this.state.compareAndSet(State.READING, State.IDLE)) { + tryRead(); } } @@ -737,26 +764,12 @@ public abstract class DataBufferUtils { public void failed(Throwable exc, DataBuffer dataBuffer) { release(dataBuffer); closeChannel(this.channel); - if (this.disposed.compareAndSet(false, true)) { - this.sink.error(exc); - } - this.reading.set(false); + this.state.set(State.DISPOSED); + this.sink.error(exc); } - public void request(long n) { - read(); - } - - public void cancel() { - if (this.disposed.compareAndSet(false, true)) { - if (!this.reading.get()) { - closeChannel(this.channel); - } - } - } - - private boolean isNotDisposed() { - return !this.disposed.get(); + private enum State { + IDLE, READING, DISPOSED } }