Merge branch '5.1.x'

This commit is contained in:
Arjen Poutsma 2019-09-02 15:19:30 +02:00
commit 3fcf4233a2
2 changed files with 27 additions and 17 deletions

View File

@ -654,7 +654,7 @@ public abstract class DataBufferUtils {
private final AtomicBoolean reading = new AtomicBoolean();
private final AtomicBoolean canceled = new AtomicBoolean();
private final AtomicBoolean disposed = new AtomicBoolean();
public ReadCompletionHandler(AsynchronousFileChannel channel,
FluxSink<DataBuffer> sink, long position, DataBufferFactory dataBufferFactory, int bufferSize) {
@ -667,7 +667,9 @@ public abstract class DataBufferUtils {
}
public void read() {
if (this.sink.requestedFromDownstream() > 0 && this.reading.compareAndSet(false, true)) {
if (this.sink.requestedFromDownstream() > 0 &&
isNotDisposed() &&
this.reading.compareAndSet(false, true)) {
DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, this.bufferSize);
this.channel.read(byteBuffer, this.position.get(), dataBuffer, this);
@ -676,34 +678,38 @@ public abstract class DataBufferUtils {
@Override
public void completed(Integer read, DataBuffer dataBuffer) {
this.reading.set(false);
if (!isCanceled()) {
if (isNotDisposed()) {
if (read != -1) {
this.position.addAndGet(read);
dataBuffer.writePosition(read);
this.sink.next(dataBuffer);
this.reading.set(false);
read();
}
else {
release(dataBuffer);
closeChannel(this.channel);
this.sink.complete();
if (this.disposed.compareAndSet(false, true)) {
this.sink.complete();
}
this.reading.set(false);
}
}
else {
release(dataBuffer);
closeChannel(this.channel);
this.reading.set(false);
}
}
@Override
public void failed(Throwable exc, DataBuffer dataBuffer) {
this.reading.set(false);
release(dataBuffer);
closeChannel(this.channel);
if (!isCanceled()) {
if (this.disposed.compareAndSet(false, true)) {
this.sink.error(exc);
}
this.reading.set(false);
}
public void request(long n) {
@ -711,15 +717,15 @@ public abstract class DataBufferUtils {
}
public void cancel() {
if (this.canceled.compareAndSet(false, true)) {
if (this.disposed.compareAndSet(false, true)) {
if (!this.reading.get()) {
closeChannel(this.channel);
}
}
}
private boolean isCanceled() {
return this.canceled.get();
private boolean isNotDisposed() {
return !this.disposed.get();
}
}

View File

@ -18,11 +18,13 @@ package org.springframework.web.reactive.function.client;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Arrays;
@ -565,7 +567,6 @@ class WebClientIntegrationTests {
prepareResponse(response -> {});
Resource resource = new ClassPathResource("largeTextFile.txt", getClass());
byte[] expected = Files.readAllBytes(resource.getFile().toPath());
Flux<DataBuffer> body = DataBufferUtils.read(resource, new DefaultDataBufferFactory(), 4096);
Mono<Void> result = this.webClient.post()
@ -574,18 +575,21 @@ class WebClientIntegrationTests {
.retrieve()
.bodyToMono(Void.class);
StepVerifier.create(result).verifyComplete();
StepVerifier.create(result)
.expectComplete()
.verify(Duration.ofSeconds(5));
expectRequest(request -> {
ByteArrayOutputStream actual = new ByteArrayOutputStream();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
request.getBody().copyTo(actual);
request.getBody().copyTo(bos);
String actual = bos.toString("UTF-8");
String expected = new String(Files.readAllBytes(resource.getFile().toPath()), StandardCharsets.UTF_8);
assertThat(actual).isEqualTo(expected);
}
catch (IOException ex) {
throw new IllegalStateException(ex);
throw new UncheckedIOException(ex);
}
assertThat(actual.size()).isEqualTo(expected.length);
assertThat(hash(actual.toByteArray())).isEqualTo(hash(expected));
});
}