From c45b106d955a44a99b1c532ac59b15df68f9bf06 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 15 Oct 2020 16:49:39 +0100 Subject: [PATCH] Fix race issue and improve readAsynchronousFileChannel Before this commit, ReadCompletionHandler delayed closing the channel to allow an ongoing read to complete/fail so we can get a hold of the associated DataBuffer and release it. This can be problematic since the read take time to complete but even more importantly there was a race condition where we didn't check if we've been disposed concurrently while releasing the read flag. This commit removes the delay and closes the channel immediately from cancel() and that should in turn fail any ongoing read operation, according to AsynchronousChannel, and the DataBuffer is released. Further improvements include: - combining the "reading" and "disposed" AtomicBoolean's into a single "state" AtomicReference. - an optimistic check to remain in READING mode and avoid state switches when there is demand to continue reading. Closes gh-25831 --- .../core/io/buffer/DataBufferUtils.java | 107 ++++++++++-------- 1 file changed, 60 insertions(+), 47 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index cfedb647470..24a90223ad3 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -683,9 +683,7 @@ public abstract class DataBufferUtils { private final AtomicLong position; - private final AtomicBoolean reading = new AtomicBoolean(); - - private final AtomicBoolean disposed = new AtomicBoolean(); + private final AtomicReference state = new AtomicReference<>(State.IDLE); public ReadCompletionHandler(AsynchronousFileChannel channel, FluxSink sink, long position, DataBufferFactory dataBufferFactory, int bufferSize) { @@ -697,39 +695,68 @@ public abstract class DataBufferUtils { this.bufferSize = bufferSize; } - public void read() { - if (this.sink.requestedFromDownstream() > 0 && - isNotDisposed() && - this.reading.compareAndSet(false, true)) { - DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); - ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, this.bufferSize); - this.channel.read(byteBuffer, this.position.get(), dataBuffer, this); + /** + * Invoked when Reactive Streams consumer signals demand. + */ + public void request(long n) { + tryRead(); + } + + /** + * Invoked when Reactive Streams consumer cancels. + */ + public void cancel() { + this.state.getAndSet(State.DISPOSED); + + // According java.nio.channels.AsynchronousChannel "if an I/O operation is outstanding + // on the channel and the channel's close method is invoked, then the I/O operation + // fails with the exception AsynchronousCloseException". That should invoke the failed + // callback below which and the current DataBuffer should be released. + + closeChannel(this.channel); + } + + private void tryRead() { + if (this.sink.requestedFromDownstream() > 0 && this.state.compareAndSet(State.IDLE, State.READING)) { + read(); } } + private void read() { + DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); + ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, this.bufferSize); + this.channel.read(byteBuffer, this.position.get(), dataBuffer, this); + } + @Override public void completed(Integer read, DataBuffer dataBuffer) { - if (isNotDisposed()) { - if (read != -1) { - this.position.addAndGet(read); - dataBuffer.writePosition(read); - this.sink.next(dataBuffer); - this.reading.set(false); - read(); - } - else { - release(dataBuffer); - closeChannel(this.channel); - if (this.disposed.compareAndSet(false, true)) { - this.sink.complete(); - } - this.reading.set(false); - } - } - else { + if (this.state.get().equals(State.DISPOSED)) { release(dataBuffer); closeChannel(this.channel); - this.reading.set(false); + return; + } + + if (read == -1) { + release(dataBuffer); + closeChannel(this.channel); + this.state.set(State.DISPOSED); + this.sink.complete(); + return; + } + + this.position.addAndGet(read); + dataBuffer.writePosition(read); + this.sink.next(dataBuffer); + + // Stay in READING mode if there is demand + if (this.sink.requestedFromDownstream() > 0) { + read(); + return; + } + + // Release READING mode and then try again in case of concurrent "request" + if (this.state.compareAndSet(State.READING, State.IDLE)) { + tryRead(); } } @@ -737,26 +764,12 @@ public abstract class DataBufferUtils { public void failed(Throwable exc, DataBuffer dataBuffer) { release(dataBuffer); closeChannel(this.channel); - if (this.disposed.compareAndSet(false, true)) { - this.sink.error(exc); - } - this.reading.set(false); + this.state.set(State.DISPOSED); + this.sink.error(exc); } - public void request(long n) { - read(); - } - - public void cancel() { - if (this.disposed.compareAndSet(false, true)) { - if (!this.reading.get()) { - closeChannel(this.channel); - } - } - } - - private boolean isNotDisposed() { - return !this.disposed.get(); + private enum State { + IDLE, READING, DISPOSED } }