Fix timing bug in DataBufferUtils::readAsynchronousFileChannel

This commit makes sure that reading is enabled after the current
signal has been processed, not while is is being processed. The bug
was only apparent while using the JettyClientHttpConnector, which
requests new elements continuously, even after the end of the
stream has been signalled.
This commit is contained in:
Arjen Poutsma 2019-09-02 15:13:22 +02:00
parent 9729b460f1
commit f748b1e68d
2 changed files with 27 additions and 17 deletions

View File

@ -506,7 +506,7 @@ public abstract class DataBufferUtils {
private final AtomicBoolean reading = new AtomicBoolean();
private final AtomicBoolean canceled = new AtomicBoolean();
private final AtomicBoolean disposed = new AtomicBoolean();
public ReadCompletionHandler(AsynchronousFileChannel channel,
FluxSink<DataBuffer> sink, long position, DataBufferFactory dataBufferFactory, int bufferSize) {
@ -519,7 +519,9 @@ public abstract class DataBufferUtils {
}
public void read() {
if (this.sink.requestedFromDownstream() > 0 && this.reading.compareAndSet(false, true)) {
if (this.sink.requestedFromDownstream() > 0 &&
isNotDisposed() &&
this.reading.compareAndSet(false, true)) {
DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, this.bufferSize);
this.channel.read(byteBuffer, this.position.get(), dataBuffer, this);
@ -528,34 +530,38 @@ public abstract class DataBufferUtils {
@Override
public void completed(Integer read, DataBuffer dataBuffer) {
this.reading.set(false);
if (!isCanceled()) {
if (isNotDisposed()) {
if (read != -1) {
this.position.addAndGet(read);
dataBuffer.writePosition(read);
this.sink.next(dataBuffer);
this.reading.set(false);
read();
}
else {
release(dataBuffer);
closeChannel(this.channel);
this.sink.complete();
if (this.disposed.compareAndSet(false, true)) {
this.sink.complete();
}
this.reading.set(false);
}
}
else {
release(dataBuffer);
closeChannel(this.channel);
this.reading.set(false);
}
}
@Override
public void failed(Throwable exc, DataBuffer dataBuffer) {
this.reading.set(false);
release(dataBuffer);
closeChannel(this.channel);
if (!isCanceled()) {
if (this.disposed.compareAndSet(false, true)) {
this.sink.error(exc);
}
this.reading.set(false);
}
public void request(long n) {
@ -563,15 +569,15 @@ public abstract class DataBufferUtils {
}
public void cancel() {
if (this.canceled.compareAndSet(false, true)) {
if (this.disposed.compareAndSet(false, true)) {
if (!this.reading.get()) {
closeChannel(this.channel);
}
}
}
private boolean isCanceled() {
return this.canceled.get();
private boolean isNotDisposed() {
return !this.disposed.get();
}
}

View File

@ -18,7 +18,9 @@ package org.springframework.web.reactive.function.client;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Arrays;
@ -406,7 +408,6 @@ public class WebClientIntegrationTests {
prepareResponse(response -> {});
Resource resource = new ClassPathResource("largeTextFile.txt", getClass());
byte[] expected = Files.readAllBytes(resource.getFile().toPath());
Flux<DataBuffer> body = DataBufferUtils.read(resource, new DefaultDataBufferFactory(), 4096);
Mono<Void> result = this.webClient.post()
@ -415,18 +416,21 @@ public class WebClientIntegrationTests {
.retrieve()
.bodyToMono(Void.class);
StepVerifier.create(result).verifyComplete();
StepVerifier.create(result)
.expectComplete()
.verify(Duration.ofSeconds(5));
expectRequest(request -> {
ByteArrayOutputStream actual = new ByteArrayOutputStream();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
request.getBody().copyTo(actual);
request.getBody().copyTo(bos);
String actual = bos.toString("UTF-8");
String expected = new String(Files.readAllBytes(resource.getFile().toPath()), StandardCharsets.UTF_8);
assertEquals(expected, actual);
}
catch (IOException ex) {
throw new IllegalStateException(ex);
throw new UncheckedIOException(ex);
}
assertEquals(expected.length, actual.size());
assertEquals(hash(expected), hash(actual.toByteArray()));
});
}