From 16f3f8d28ff16e5d88fe2d349fc78b1a6512b1cb Mon Sep 17 00:00:00 2001 From: Brian Clozel Date: Wed, 6 Sep 2017 14:06:56 +0200 Subject: [PATCH] Add close() method on HTTP client response Before this commit, there was no way to signal the HTTP client that we were done consuming the response. Without that, the underlying client library cannot know when it is safe to release the associated resources (e.g. the HTTP connection). This commit adds new `close()` methods on both `ClientHttpResponse` and `ClientResponse`. This methods is non-blocking and its behavior depends on the library, its configuration, HTTP version, etc. At the `WebClient` level, `close()` is called automatically if we consume the response body through the `ResponseSpec` or the `ClientResponse` itself. Note that it is *required* to call `close()` manually otherwise; not doing so might create resource leaks or connection issues. Issue: SPR-15920 --- .../reactive/MockClientHttpResponse.java | 15 +++ .../client/reactive/ClientHttpResponse.java | 16 ++- .../reactive/ClientHttpResponseDecorator.java | 4 + .../reactive/ReactorClientHttpResponse.java | 4 + .../reactive/test/MockClientHttpResponse.java | 16 +++ .../function/client/ClientResponse.java | 15 ++- .../client/DefaultClientResponse.java | 21 +++- .../function/client/DefaultWebClient.java | 30 +++-- .../reactive/function/client/WebClient.java | 11 ++ .../client/WebClientIntegrationTests.java | 6 + .../function/client/WebClientMockTests.java | 116 ++++++++++++++++++ 11 files changed, 238 insertions(+), 16 deletions(-) create mode 100644 spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientMockTests.java diff --git a/spring-test/src/main/java/org/springframework/mock/http/client/reactive/MockClientHttpResponse.java b/spring-test/src/main/java/org/springframework/mock/http/client/reactive/MockClientHttpResponse.java index b67b3a1392..0627906e2c 100644 --- a/spring-test/src/main/java/org/springframework/mock/http/client/reactive/MockClientHttpResponse.java +++ b/spring-test/src/main/java/org/springframework/mock/http/client/reactive/MockClientHttpResponse.java @@ -55,6 +55,8 @@ public class MockClientHttpResponse implements ClientHttpResponse { private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); + private boolean closed = false; + public MockClientHttpResponse(HttpStatus status) { Assert.notNull(status, "HttpStatus is required"); @@ -94,10 +96,23 @@ public class MockClientHttpResponse implements ClientHttpResponse { return this.bufferFactory.wrap(byteBuffer); } + @Override public Flux getBody() { + if (this.closed) { + return Flux.error(new IllegalStateException("Connection has been closed.")); + } return this.body; } + @Override + public void close() { + this.closed = true; + } + + public boolean isClosed() { + return this.closed; + } + /** * Return the response body aggregated and converted to a String using the * charset of the Content-Type response or otherwise as "UTF-8". diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponse.java index b0563927bd..3a3955a629 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponse.java @@ -16,6 +16,8 @@ package org.springframework.http.client.reactive; +import java.io.Closeable; + import org.springframework.http.HttpStatus; import org.springframework.http.ReactiveHttpInputMessage; import org.springframework.http.ResponseCookie; @@ -25,9 +27,10 @@ import org.springframework.util.MultiValueMap; * Represents a client-side reactive HTTP response. * * @author Arjen Poutsma + * @author Brian Clozel * @since 5.0 */ -public interface ClientHttpResponse extends ReactiveHttpInputMessage { +public interface ClientHttpResponse extends ReactiveHttpInputMessage, Closeable { /** * Return the HTTP status as an {@link HttpStatus} enum value. @@ -39,4 +42,15 @@ public interface ClientHttpResponse extends ReactiveHttpInputMessage { */ MultiValueMap getCookies(); + /** + * Close this response, freeing any resources created. + *

This non-blocking method has to be called once the response has been + * processed and the resources are no longer needed; not doing so might + * create resource leaks or connection issues. + *

Depending on the client configuration and HTTP version, + * this can lead to closing the connection or returning it to a connection pool. + */ + @Override + void close(); + } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponseDecorator.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponseDecorator.java index b37a394e9c..c8cbf11462 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponseDecorator.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponseDecorator.java @@ -70,6 +70,10 @@ public class ClientHttpResponseDecorator implements ClientHttpResponse { return this.delegate.getBody(); } + @Override + public void close() { + this.delegate.close(); + } @Override public String toString() { diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java index 7674b08907..778a9a512f 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java @@ -89,6 +89,10 @@ public class ReactorClientHttpResponse implements ClientHttpResponse { return CollectionUtils.unmodifiableMultiValueMap(result); } + @Override + public void close() { + this.response.dispose(); + } @Override public String toString() { diff --git a/spring-web/src/test/java/org/springframework/mock/http/client/reactive/test/MockClientHttpResponse.java b/spring-web/src/test/java/org/springframework/mock/http/client/reactive/test/MockClientHttpResponse.java index c872d13c2e..7f89d05792 100644 --- a/spring-web/src/test/java/org/springframework/mock/http/client/reactive/test/MockClientHttpResponse.java +++ b/spring-web/src/test/java/org/springframework/mock/http/client/reactive/test/MockClientHttpResponse.java @@ -55,6 +55,7 @@ public class MockClientHttpResponse implements ClientHttpResponse { private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); + private boolean closed = false; public MockClientHttpResponse(HttpStatus status) { Assert.notNull(status, "HttpStatus is required"); @@ -62,6 +63,7 @@ public class MockClientHttpResponse implements ClientHttpResponse { } + @Override public HttpStatus getStatusCode() { return this.status; } @@ -71,6 +73,7 @@ public class MockClientHttpResponse implements ClientHttpResponse { return this.headers; } + @Override public MultiValueMap getCookies() { return this.cookies; } @@ -94,10 +97,23 @@ public class MockClientHttpResponse implements ClientHttpResponse { return this.bufferFactory.wrap(byteBuffer); } + @Override public Flux getBody() { + if (this.closed) { + return Flux.error(new IllegalStateException("Connection has been closed.")); + } return this.body; } + @Override + public void close() { + this.closed = true; + } + + public boolean isClosed() { + return this.closed; + } + /** * Return the response body aggregated and converted to a String using the * charset of the Content-Type response or otherwise as "UTF-8". diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java index 268fba5121..58a1da89c7 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java @@ -16,6 +16,7 @@ package org.springframework.web.reactive.function.client; +import java.io.Closeable; import java.util.List; import java.util.Optional; import java.util.OptionalLong; @@ -43,7 +44,7 @@ import org.springframework.web.reactive.function.BodyExtractor; * @author Arjen Poutsma * @since 5.0 */ -public interface ClientResponse { +public interface ClientResponse extends Closeable { /** * Return the status code of this response. @@ -132,6 +133,18 @@ public interface ClientResponse { */ Mono>> toEntityList(ParameterizedTypeReference typeReference); + /** + * Close this response, freeing any resources created. + *

This non-blocking method has to be called once the response has been processed + * and the resources are no longer needed. + *

{@code ClientResponse.bodyTo*}, {@code ClientResponse.toEntity*} + * and all methods under {@code WebClient.retrieve()} will close the response + * automatically. + *

It is required to call close() manually otherwise; not doing so might + * create resource leaks or connection issues. + */ + @Override + void close(); /** * Represents the headers of the HTTP response. diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java index 664e058ce0..31d8820d25 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java @@ -42,6 +42,7 @@ import org.springframework.web.reactive.function.BodyExtractors; * Default implementation of {@link ClientResponse}. * * @author Arjen Poutsma + * @author Brian Clozel * @since 5.0 */ class DefaultClientResponse implements ClientResponse { @@ -97,22 +98,24 @@ class DefaultClientResponse implements ClientResponse { @Override public Mono bodyToMono(Class elementClass) { - return body(BodyExtractors.toMono(elementClass)); + Mono body = body(BodyExtractors.toMono(elementClass)); + return body.doOnTerminate(this.response::close); } @Override public Mono bodyToMono(ParameterizedTypeReference typeReference) { - return body(BodyExtractors.toMono(typeReference)); + return body(BodyExtractors.toMono(typeReference)).doOnTerminate(this.response::close); } @Override public Flux bodyToFlux(Class elementClass) { - return body(BodyExtractors.toFlux(elementClass)); + Flux body = body(BodyExtractors.toFlux(elementClass)); + return body.doOnTerminate(this.response::close); } @Override public Flux bodyToFlux(ParameterizedTypeReference typeReference) { - return body(BodyExtractors.toFlux(typeReference)); + return body(BodyExtractors.toFlux(typeReference)).doOnTerminate(this.response::close); } @Override @@ -131,7 +134,8 @@ class DefaultClientResponse implements ClientResponse { return bodyMono .map(body -> new ResponseEntity<>(body, headers, statusCode)) .switchIfEmpty(Mono.defer( - () -> Mono.just(new ResponseEntity<>(headers, statusCode)))); + () -> Mono.just(new ResponseEntity<>(headers, statusCode)))) + .doOnTerminate(this.response::close); } @Override @@ -150,9 +154,14 @@ class DefaultClientResponse implements ClientResponse { HttpStatus statusCode = statusCode(); return bodyFlux .collectList() - .map(body -> new ResponseEntity<>(body, headers, statusCode)); + .map(body -> new ResponseEntity<>(body, headers, statusCode)) + .doOnTerminate(this.response::close); } + @Override + public void close() { + this.response.close(); + } private class DefaultHeaders implements Headers { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java index 592e96fed6..f2562afcd8 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java @@ -413,7 +413,7 @@ class DefaultWebClient implements WebClient { @SuppressWarnings("unchecked") public Mono bodyToMono(Class bodyType) { return this.responseMono.flatMap( - response -> bodyToPublisher(response, BodyExtractors.toMono(bodyType), + response -> bodyToMono(response, BodyExtractors.toMono(bodyType), this::monoThrowableToMono)); } @@ -421,7 +421,7 @@ class DefaultWebClient implements WebClient { @SuppressWarnings("unchecked") public Mono bodyToMono(ParameterizedTypeReference typeReference) { return this.responseMono.flatMap( - response -> bodyToPublisher(response, BodyExtractors.toMono(typeReference), + response -> bodyToMono(response, BodyExtractors.toMono(typeReference), mono -> (Mono)mono)); } @@ -429,17 +429,30 @@ class DefaultWebClient implements WebClient { return mono.flatMap(Mono::error); } + private Mono bodyToMono(ClientResponse response, + BodyExtractor, ? super ClientHttpResponse> extractor, + Function, Mono> errorFunction) { + + return this.statusHandlers.stream() + .filter(statusHandler -> statusHandler.test(response.statusCode())) + .findFirst() + .map(statusHandler -> statusHandler.apply(response)) + .map(errorFunction::apply) + .orElse(response.body(extractor)) + .doAfterTerminate(response::close); + } + @Override public Flux bodyToFlux(Class elementType) { return this.responseMono.flatMapMany( - response -> bodyToPublisher(response, BodyExtractors.toFlux(elementType), + response -> bodyToFlux(response, BodyExtractors.toFlux(elementType), this::monoThrowableToFlux)); } @Override public Flux bodyToFlux(ParameterizedTypeReference typeReference) { return this.responseMono.flatMapMany( - response -> bodyToPublisher(response, BodyExtractors.toFlux(typeReference), + response -> bodyToFlux(response, BodyExtractors.toFlux(typeReference), this::monoThrowableToFlux)); } @@ -447,16 +460,17 @@ class DefaultWebClient implements WebClient { return mono.flatMapMany(Flux::error); } - private > T bodyToPublisher(ClientResponse response, - BodyExtractor extractor, - Function, T> errorFunction) { + private Flux bodyToFlux(ClientResponse response, + BodyExtractor, ? super ClientHttpResponse> extractor, + Function, Flux> errorFunction) { return this.statusHandlers.stream() .filter(statusHandler -> statusHandler.test(response.statusCode())) .findFirst() .map(statusHandler -> statusHandler.apply(response)) .map(errorFunction::apply) - .orElse(response.body(extractor)); + .orElse(response.body(extractor)) + .doAfterTerminate(response::close); } private static Mono createResponseException(ClientResponse response) { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java index 3122d94d70..eec6489270 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java @@ -461,6 +461,17 @@ public interface WebClient { * .exchange() * .flatMapMany(response -> response.bodyToFlux(Pojo.class)); * + *

If the response body is not consumed with {@code bodyTo*} + * or {@code toEntity*} methods, it is your responsibility + * to release the HTTP resources with {@link ClientResponse#close()}. + *

+		 * Mono<HttpStatus> mono = client.get().uri("/")
+		 *     .exchange()
+		 *     .map(response -> {
+		 *         response.close();
+		 *         return response.statusCode();
+		 *     });
+		 * 
* @return a {@code Mono} with the response * @see #retrieve() */ diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java index adf1671c79..ddd36837e1 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java @@ -70,6 +70,12 @@ public class WebClientIntegrationTests { public void headers() throws Exception { this.server.enqueue(new MockResponse().setHeader("Content-Type", "text/plain").setBody("Hello Spring!")); + this.webClient.get().uri("/test") + .exchange() + .map(response -> { + response.close(); + return response.statusCode(); + }); Mono result = this.webClient.get() .uri("/greeting?name=Spring") .exchange() diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientMockTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientMockTests.java new file mode 100644 index 0000000000..fabe7abce7 --- /dev/null +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientMockTests.java @@ -0,0 +1,116 @@ +package org.springframework.web.reactive.function.client; + +import org.junit.Before; +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.http.client.reactive.ClientHttpConnector; +import org.springframework.mock.http.client.reactive.test.MockClientHttpResponse; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; + +/** + * Mock tests using a {@link ExchangeFunction} through {@link WebClient}. + * + * @author Brian Clozel + */ +public class WebClientMockTests { + + private MockClientHttpResponse response; + + private ClientHttpConnector mockConnector; + + private WebClient webClient; + + @Before + public void setUp() throws Exception { + this.mockConnector = mock(ClientHttpConnector.class); + this.webClient = WebClient.builder().clientConnector(this.mockConnector).build(); + this.response = new MockClientHttpResponse(HttpStatus.OK); + this.response.setBody("example"); + + given(this.mockConnector.connect(any(), any(), any())).willReturn(Mono.just(this.response)); + } + + @Test + public void shouldDisposeResponseManually() { + Mono headers= this.webClient + .get().uri("/test") + .exchange() + .map(response -> response.headers().asHttpHeaders()); + StepVerifier.create(headers) + .expectNextCount(1) + .verifyComplete(); + assertFalse(this.response.isClosed()); + } + + @Test + public void shouldDisposeResponseExchangeMono() { + Mono body = this.webClient + .get().uri("/test") + .exchange() + .flatMap(response -> response.bodyToMono(String.class)); + StepVerifier.create(body) + .expectNext("example") + .verifyComplete(); + assertTrue(this.response.isClosed()); + } + + @Test + public void shouldDisposeResponseExchangeFlux() { + Flux body = this.webClient + .get().uri("/test") + .exchange() + .flatMapMany(response -> response.bodyToFlux(String.class)); + StepVerifier.create(body) + .expectNext("example") + .verifyComplete(); + assertTrue(this.response.isClosed()); + } + + @Test + public void shouldDisposeResponseExchangeEntity() { + ResponseEntity entity = this.webClient + .get().uri("/test") + .exchange() + .flatMap(response -> response.toEntity(String.class)) + .block(); + assertEquals("example", entity.getBody()); + assertTrue(this.response.isClosed()); + } + + @Test + public void shouldDisposeResponseRetrieveMono() { + Mono body = this.webClient + .get().uri("/test") + .retrieve() + .bodyToMono(String.class); + StepVerifier.create(body) + .expectNext("example") + .verifyComplete(); + assertTrue(this.response.isClosed()); + } + + @Test + public void shouldDisposeResponseRetrieveFlux() { + Flux body = this.webClient + .get().uri("/test") + .retrieve() + .bodyToFlux(String.class); + StepVerifier.create(body) + .expectNext("example") + .verifyComplete(); + assertTrue(this.response.isClosed()); + } + +}