From 21d069695f301851254ff2d03b44478303f24806 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 23 Jun 2020 19:58:57 +0100 Subject: [PATCH] Refine solution for 21de09 The following was reported after the change and is related to it: https://github.com/reactor/reactor-netty/issues/1170. An HTTP HEAD with the body not consumed. Connection is disposed and closed leading to subsequent request to fail. Adding toBodilessEntity() helps. This change does not close the connection but rather drains the body which does not impact subsequent re-use of the connection. This however may compete with a late subscriber actually attempting to read the response. At that point there is little choice but to raise an ISE with a more specific description. See gh-25216 --- .../reactive/ReactorClientHttpConnector.java | 4 +- .../reactive/ReactorClientHttpResponse.java | 62 ++++++++++++------- 2 files changed, 42 insertions(+), 24 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java index e2af34a6208..4b635cfba42 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java @@ -115,8 +115,8 @@ public class ReactorClientHttpConnector implements ClientHttpConnector { .next() .doOnCancel(() -> { ReactorClientHttpResponse response = responseRef.get(); - if (response != null && response.bodyNotSubscribed()) { - response.getConnection().dispose(); + if (response != null) { + response.releaseAfterCancel(method); } }); } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java index 7872b5020ac..678a68f72f6 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java @@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import io.netty.buffer.ByteBufAllocator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Flux; import reactor.netty.Connection; import reactor.netty.NettyInbound; @@ -29,10 +31,10 @@ import reactor.netty.http.client.HttpClientResponse; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseCookie; import org.springframework.lang.Nullable; -import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -46,6 +48,8 @@ import org.springframework.util.MultiValueMap; */ class ReactorClientHttpResponse implements ClientHttpResponse { + private static final Log logger = LogFactory.getLog(ReactorClientHttpResponse.class); + private final NettyDataBufferFactory bufferFactory; private final HttpClientResponse response; @@ -53,9 +57,9 @@ class ReactorClientHttpResponse implements ClientHttpResponse { private final NettyInbound inbound; @Nullable - private final Connection connection; + private final String logPrefix; - // 0 - not subscribed, 1 - subscribed, 2 - cancelled + // 0 - not subscribed, 1 - subscribed, 2 - cancelled, 3 - cancelled via connector (before subscribe) private final AtomicInteger state = new AtomicInteger(0); @@ -68,7 +72,7 @@ class ReactorClientHttpResponse implements ClientHttpResponse { this.response = response; this.inbound = connection.inbound(); this.bufferFactory = new NettyDataBufferFactory(connection.outbound().alloc()); - this.connection = connection; + this.logPrefix = (logger.isDebugEnabled() ? "[" + connection.channel().id().asShortText() + "] " : ""); } /** @@ -80,7 +84,7 @@ class ReactorClientHttpResponse implements ClientHttpResponse { this.response = response; this.inbound = inbound; this.bufferFactory = new NettyDataBufferFactory(alloc); - this.connection = null; + this.logPrefix = ""; } @@ -88,14 +92,20 @@ class ReactorClientHttpResponse implements ClientHttpResponse { public Flux getBody() { return this.inbound.receive() .doOnSubscribe(s -> { - if (!this.state.compareAndSet(0, 1)) { - // https://github.com/reactor/reactor-netty/issues/503 - // FluxReceive rejects multiple subscribers, but not after a cancel(). - // Subsequent subscribers after cancel() will not be rejected, but will hang instead. - // So we need to reject once in cancelled state. - if (this.state.get() == 2) { - throw new IllegalStateException("The client response body can only be consumed once."); - } + if (this.state.compareAndSet(0, 1)) { + return; + } + // https://github.com/reactor/reactor-netty/issues/503 + // FluxReceive rejects multiple subscribers, but not after a cancel(). + // Subsequent subscribers after cancel() will not be rejected, but will hang instead. + // So we need to reject once in cancelled state. + if (this.state.get() == 2) { + throw new IllegalStateException( + "The client response body can only be consumed once."); + } + else if (this.state.get() == 3) { + throw new IllegalStateException( + "The client response body has been released already due to cancellation."); } }) .doOnCancel(() -> this.state.compareAndSet(1, 2)) @@ -127,6 +137,7 @@ class ReactorClientHttpResponse implements ClientHttpResponse { MultiValueMap result = new LinkedMultiValueMap<>(); this.response.cookies().values().stream().flatMap(Collection::stream) .forEach(c -> + result.add(c.name(), ResponseCookie.fromClientResponse(c.name(), c.value()) .domain(c.domain()) .path(c.path()) @@ -138,18 +149,25 @@ class ReactorClientHttpResponse implements ClientHttpResponse { } /** - * For use by {@link ReactorClientHttpConnector}. + * Called by {@link ReactorClientHttpConnector} when a cancellation is detected + * but the content has not been subscribed to. If the subscription never + * materializes then the content will remain not drained. Or it could still + * materialize if the cancellation happened very early, or the response + * reading was delayed for some reason. */ - boolean bodyNotSubscribed() { - return this.state.get() == 0; + void releaseAfterCancel(HttpMethod method) { + if (mayHaveBody(method) && this.state.compareAndSet(0, 3)) { + if (logger.isDebugEnabled()) { + logger.debug(this.logPrefix + "Releasing body, not yet subscribed."); + } + this.inbound.receive().doOnNext(byteBuf -> {}).subscribe(byteBuf -> {}, ex -> {}); + } } - /** - * For use by {@link ReactorClientHttpConnector}. - */ - Connection getConnection() { - Assert.notNull(this.connection, "Constructor with connection wasn't used"); - return this.connection; + private boolean mayHaveBody(HttpMethod method) { + int code = this.getRawStatusCode(); + return !((code >= 100 && code < 200) || code == 204 || code == 205 || + method.equals(HttpMethod.HEAD) || getHeaders().getContentLength() == 0); } @Override