Merge branch '5.2.x'

This commit is contained in:
Rossen Stoyanchev 2020-06-23 20:47:49 +01:00
commit 19fb0f113b
2 changed files with 61 additions and 26 deletions

View File

@ -116,8 +116,8 @@ public class ReactorClientHttpConnector implements ClientHttpConnector {
.next()
.doOnCancel(() -> {
ReactorClientHttpResponse response = responseRef.get();
if (response != null && response.bodyNotSubscribed()) {
response.getConnection().dispose();
if (response != null) {
response.releaseAfterCancel(method);
}
});
}

View File

@ -20,6 +20,9 @@ import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import io.netty.buffer.ByteBufAllocator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.netty.Connection;
import reactor.netty.NettyInbound;
@ -28,6 +31,7 @@ import reactor.netty.http.client.HttpClientResponse;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
import org.springframework.util.CollectionUtils;
@ -43,19 +47,21 @@ import org.springframework.util.MultiValueMap;
*/
class ReactorClientHttpResponse implements ClientHttpResponse {
private static final Log logger = LogFactory.getLog(ReactorClientHttpResponse.class);
private final HttpClientResponse response;
private final HttpHeaders headers;
private final NettyInbound inbound;
private final NettyDataBufferFactory bufferFactory;
private final Connection connection;
private final HttpHeaders headers;
// 0 - not subscribed, 1 - subscribed, 2 - cancelled
// 0 - not subscribed, 1 - subscribed, 2 - cancelled, 3 - cancelled via connector (before subscribe)
private final AtomicInteger state = new AtomicInteger(0);
private final String logPrefix;
/**
* Constructor that matches the inputs from
@ -64,11 +70,25 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
*/
public ReactorClientHttpResponse(HttpClientResponse response, Connection connection) {
this.response = response;
this.inbound = connection.inbound();
this.bufferFactory = new NettyDataBufferFactory(connection.outbound().alloc());
this.connection = connection;
MultiValueMap<String, String> adapter = new NettyHeadersAdapter(response.responseHeaders());
this.headers = HttpHeaders.readOnlyHttpHeaders(adapter);
this.inbound = connection.inbound();
this.bufferFactory = new NettyDataBufferFactory(connection.outbound().alloc());
this.logPrefix = (logger.isDebugEnabled() ? "[" + connection.channel().id().asShortText() + "] " : "");
}
/**
* Constructor with inputs extracted from a {@link Connection}.
* @deprecated as of 5.2.8
*/
@Deprecated
public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbound, ByteBufAllocator alloc) {
this.response = response;
MultiValueMap<String, String> adapter = new NettyHeadersAdapter(response.responseHeaders());
this.headers = HttpHeaders.readOnlyHttpHeaders(adapter);
this.inbound = inbound;
this.bufferFactory = new NettyDataBufferFactory(alloc);
this.logPrefix = "";
}
@ -76,14 +96,20 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
public Flux<DataBuffer> getBody() {
return this.inbound.receive()
.doOnSubscribe(s -> {
if (!this.state.compareAndSet(0, 1)) {
// https://github.com/reactor/reactor-netty/issues/503
// FluxReceive rejects multiple subscribers, but not after a cancel().
// Subsequent subscribers after cancel() will not be rejected, but will hang instead.
// So we need to reject once in cancelled state.
if (this.state.get() == 2) {
throw new IllegalStateException("The client response body can only be consumed once.");
}
if (this.state.compareAndSet(0, 1)) {
return;
}
// https://github.com/reactor/reactor-netty/issues/503
// FluxReceive rejects multiple subscribers, but not after a cancel().
// Subsequent subscribers after cancel() will not be rejected, but will hang instead.
// So we need to reject once in cancelled state.
if (this.state.get() == 2) {
throw new IllegalStateException(
"The client response body can only be consumed once.");
}
else if (this.state.get() == 3) {
throw new IllegalStateException(
"The client response body has been released already due to cancellation.");
}
})
.doOnCancel(() -> this.state.compareAndSet(1, 2))
@ -113,6 +139,7 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
MultiValueMap<String, ResponseCookie> result = new LinkedMultiValueMap<>();
this.response.cookies().values().stream().flatMap(Collection::stream)
.forEach(c ->
result.add(c.name(), ResponseCookie.fromClientResponse(c.name(), c.value())
.domain(c.domain())
.path(c.path())
@ -124,17 +151,25 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
}
/**
* For use by {@link ReactorClientHttpConnector}.
* Called by {@link ReactorClientHttpConnector} when a cancellation is detected
* but the content has not been subscribed to. If the subscription never
* materializes then the content will remain not drained. Or it could still
* materialize if the cancellation happened very early, or the response
* reading was delayed for some reason.
*/
boolean bodyNotSubscribed() {
return this.state.get() == 0;
void releaseAfterCancel(HttpMethod method) {
if (mayHaveBody(method) && this.state.compareAndSet(0, 3)) {
if (logger.isDebugEnabled()) {
logger.debug(this.logPrefix + "Releasing body, not yet subscribed.");
}
this.inbound.receive().doOnNext(byteBuf -> {}).subscribe(byteBuf -> {}, ex -> {});
}
}
/**
* For use by {@link ReactorClientHttpConnector}.
*/
Connection getConnection() {
return this.connection;
private boolean mayHaveBody(HttpMethod method) {
int code = this.getRawStatusCode();
return !((code >= 100 && code < 200) || code == 204 || code == 205 ||
method.equals(HttpMethod.HEAD) || getHeaders().getContentLength() == 0);
}
@Override