diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java index b001022966d..cd7bbe70cf5 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java @@ -116,8 +116,8 @@ public class ReactorClientHttpConnector implements ClientHttpConnector { .next() .doOnCancel(() -> { ReactorClientHttpResponse response = responseRef.get(); - if (response != null && response.bodyNotSubscribed()) { - response.getConnection().dispose(); + if (response != null) { + response.releaseAfterCancel(method); } }); } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java index 7b507544455..e0615dbc2c7 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java @@ -20,6 +20,9 @@ import java.util.Collection; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; +import io.netty.buffer.ByteBufAllocator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Flux; import reactor.netty.Connection; import reactor.netty.NettyInbound; @@ -28,6 +31,7 @@ import reactor.netty.http.client.HttpClientResponse; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseCookie; import org.springframework.util.CollectionUtils; @@ -43,19 +47,21 @@ import org.springframework.util.MultiValueMap; */ class ReactorClientHttpResponse implements ClientHttpResponse { + private static final Log logger = LogFactory.getLog(ReactorClientHttpResponse.class); + private final HttpClientResponse response; + private final HttpHeaders headers; + private final NettyInbound inbound; private final NettyDataBufferFactory bufferFactory; - private final Connection connection; - - private final HttpHeaders headers; - - // 0 - not subscribed, 1 - subscribed, 2 - cancelled + // 0 - not subscribed, 1 - subscribed, 2 - cancelled, 3 - cancelled via connector (before subscribe) private final AtomicInteger state = new AtomicInteger(0); + private final String logPrefix; + /** * Constructor that matches the inputs from @@ -64,11 +70,25 @@ class ReactorClientHttpResponse implements ClientHttpResponse { */ public ReactorClientHttpResponse(HttpClientResponse response, Connection connection) { this.response = response; - this.inbound = connection.inbound(); - this.bufferFactory = new NettyDataBufferFactory(connection.outbound().alloc()); - this.connection = connection; MultiValueMap adapter = new NettyHeadersAdapter(response.responseHeaders()); this.headers = HttpHeaders.readOnlyHttpHeaders(adapter); + this.inbound = connection.inbound(); + this.bufferFactory = new NettyDataBufferFactory(connection.outbound().alloc()); + this.logPrefix = (logger.isDebugEnabled() ? "[" + connection.channel().id().asShortText() + "] " : ""); + } + + /** + * Constructor with inputs extracted from a {@link Connection}. + * @deprecated as of 5.2.8 + */ + @Deprecated + public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbound, ByteBufAllocator alloc) { + this.response = response; + MultiValueMap adapter = new NettyHeadersAdapter(response.responseHeaders()); + this.headers = HttpHeaders.readOnlyHttpHeaders(adapter); + this.inbound = inbound; + this.bufferFactory = new NettyDataBufferFactory(alloc); + this.logPrefix = ""; } @@ -76,14 +96,20 @@ class ReactorClientHttpResponse implements ClientHttpResponse { public Flux getBody() { return this.inbound.receive() .doOnSubscribe(s -> { - if (!this.state.compareAndSet(0, 1)) { - // https://github.com/reactor/reactor-netty/issues/503 - // FluxReceive rejects multiple subscribers, but not after a cancel(). - // Subsequent subscribers after cancel() will not be rejected, but will hang instead. - // So we need to reject once in cancelled state. - if (this.state.get() == 2) { - throw new IllegalStateException("The client response body can only be consumed once."); - } + if (this.state.compareAndSet(0, 1)) { + return; + } + // https://github.com/reactor/reactor-netty/issues/503 + // FluxReceive rejects multiple subscribers, but not after a cancel(). + // Subsequent subscribers after cancel() will not be rejected, but will hang instead. + // So we need to reject once in cancelled state. + if (this.state.get() == 2) { + throw new IllegalStateException( + "The client response body can only be consumed once."); + } + else if (this.state.get() == 3) { + throw new IllegalStateException( + "The client response body has been released already due to cancellation."); } }) .doOnCancel(() -> this.state.compareAndSet(1, 2)) @@ -113,6 +139,7 @@ class ReactorClientHttpResponse implements ClientHttpResponse { MultiValueMap result = new LinkedMultiValueMap<>(); this.response.cookies().values().stream().flatMap(Collection::stream) .forEach(c -> + result.add(c.name(), ResponseCookie.fromClientResponse(c.name(), c.value()) .domain(c.domain()) .path(c.path()) @@ -124,17 +151,25 @@ class ReactorClientHttpResponse implements ClientHttpResponse { } /** - * For use by {@link ReactorClientHttpConnector}. + * Called by {@link ReactorClientHttpConnector} when a cancellation is detected + * but the content has not been subscribed to. If the subscription never + * materializes then the content will remain not drained. Or it could still + * materialize if the cancellation happened very early, or the response + * reading was delayed for some reason. */ - boolean bodyNotSubscribed() { - return this.state.get() == 0; + void releaseAfterCancel(HttpMethod method) { + if (mayHaveBody(method) && this.state.compareAndSet(0, 3)) { + if (logger.isDebugEnabled()) { + logger.debug(this.logPrefix + "Releasing body, not yet subscribed."); + } + this.inbound.receive().doOnNext(byteBuf -> {}).subscribe(byteBuf -> {}, ex -> {}); + } } - /** - * For use by {@link ReactorClientHttpConnector}. - */ - Connection getConnection() { - return this.connection; + private boolean mayHaveBody(HttpMethod method) { + int code = this.getRawStatusCode(); + return !((code >= 100 && code < 200) || code == 204 || code == 205 || + method.equals(HttpMethod.HEAD) || getHeaders().getContentLength() == 0); } @Override