Add close() method on HTTP client response

Before this commit, there was no way to signal the HTTP client that we
were done consuming the response. Without that, the underlying client
library cannot know when it is safe to release the associated resources
(e.g. the HTTP connection).

This commit adds new `close()` methods on both `ClientHttpResponse`
and `ClientResponse`. This methods is non-blocking and its behavior
depends on the library, its configuration, HTTP version, etc.

At the `WebClient` level, `close()` is called automatically if we
consume the response body through the `ResponseSpec` or the
`ClientResponse` itself.

Note that it is *required* to call `close()` manually otherwise; not
doing so might create resource leaks or connection issues.

Issue: SPR-15920
This commit is contained in:
Brian Clozel 2017-09-06 14:06:56 +02:00
parent ba6b617bd5
commit 16f3f8d28f
11 changed files with 238 additions and 16 deletions

View File

@ -55,6 +55,8 @@ public class MockClientHttpResponse implements ClientHttpResponse {
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
private boolean closed = false;
public MockClientHttpResponse(HttpStatus status) {
Assert.notNull(status, "HttpStatus is required");
@ -94,10 +96,23 @@ public class MockClientHttpResponse implements ClientHttpResponse {
return this.bufferFactory.wrap(byteBuffer);
}
@Override
public Flux<DataBuffer> getBody() {
if (this.closed) {
return Flux.error(new IllegalStateException("Connection has been closed."));
}
return this.body;
}
@Override
public void close() {
this.closed = true;
}
public boolean isClosed() {
return this.closed;
}
/**
* Return the response body aggregated and converted to a String using the
* charset of the Content-Type response or otherwise as "UTF-8".

View File

@ -16,6 +16,8 @@
package org.springframework.http.client.reactive;
import java.io.Closeable;
import org.springframework.http.HttpStatus;
import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.http.ResponseCookie;
@ -25,9 +27,10 @@ import org.springframework.util.MultiValueMap;
* Represents a client-side reactive HTTP response.
*
* @author Arjen Poutsma
* @author Brian Clozel
* @since 5.0
*/
public interface ClientHttpResponse extends ReactiveHttpInputMessage {
public interface ClientHttpResponse extends ReactiveHttpInputMessage, Closeable {
/**
* Return the HTTP status as an {@link HttpStatus} enum value.
@ -39,4 +42,15 @@ public interface ClientHttpResponse extends ReactiveHttpInputMessage {
*/
MultiValueMap<String, ResponseCookie> getCookies();
/**
* Close this response, freeing any resources created.
* <p>This non-blocking method has to be called once the response has been
* processed and the resources are no longer needed; not doing so might
* create resource leaks or connection issues.
* <p>Depending on the client configuration and HTTP version,
* this can lead to closing the connection or returning it to a connection pool.
*/
@Override
void close();
}

View File

@ -70,6 +70,10 @@ public class ClientHttpResponseDecorator implements ClientHttpResponse {
return this.delegate.getBody();
}
@Override
public void close() {
this.delegate.close();
}
@Override
public String toString() {

View File

@ -89,6 +89,10 @@ public class ReactorClientHttpResponse implements ClientHttpResponse {
return CollectionUtils.unmodifiableMultiValueMap(result);
}
@Override
public void close() {
this.response.dispose();
}
@Override
public String toString() {

View File

@ -55,6 +55,7 @@ public class MockClientHttpResponse implements ClientHttpResponse {
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
private boolean closed = false;
public MockClientHttpResponse(HttpStatus status) {
Assert.notNull(status, "HttpStatus is required");
@ -62,6 +63,7 @@ public class MockClientHttpResponse implements ClientHttpResponse {
}
@Override
public HttpStatus getStatusCode() {
return this.status;
}
@ -71,6 +73,7 @@ public class MockClientHttpResponse implements ClientHttpResponse {
return this.headers;
}
@Override
public MultiValueMap<String, ResponseCookie> getCookies() {
return this.cookies;
}
@ -94,10 +97,23 @@ public class MockClientHttpResponse implements ClientHttpResponse {
return this.bufferFactory.wrap(byteBuffer);
}
@Override
public Flux<DataBuffer> getBody() {
if (this.closed) {
return Flux.error(new IllegalStateException("Connection has been closed."));
}
return this.body;
}
@Override
public void close() {
this.closed = true;
}
public boolean isClosed() {
return this.closed;
}
/**
* Return the response body aggregated and converted to a String using the
* charset of the Content-Type response or otherwise as "UTF-8".

View File

@ -16,6 +16,7 @@
package org.springframework.web.reactive.function.client;
import java.io.Closeable;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
@ -43,7 +44,7 @@ import org.springframework.web.reactive.function.BodyExtractor;
* @author Arjen Poutsma
* @since 5.0
*/
public interface ClientResponse {
public interface ClientResponse extends Closeable {
/**
* Return the status code of this response.
@ -132,6 +133,18 @@ public interface ClientResponse {
*/
<T> Mono<ResponseEntity<List<T>>> toEntityList(ParameterizedTypeReference<T> typeReference);
/**
* Close this response, freeing any resources created.
* <p>This non-blocking method has to be called once the response has been processed
* and the resources are no longer needed.
* <p>{@code ClientResponse.bodyTo*}, {@code ClientResponse.toEntity*}
* and all methods under {@code WebClient.retrieve()} will close the response
* automatically.
* <p>It is required to call close() manually otherwise; not doing so might
* create resource leaks or connection issues.
*/
@Override
void close();
/**
* Represents the headers of the HTTP response.

View File

@ -42,6 +42,7 @@ import org.springframework.web.reactive.function.BodyExtractors;
* Default implementation of {@link ClientResponse}.
*
* @author Arjen Poutsma
* @author Brian Clozel
* @since 5.0
*/
class DefaultClientResponse implements ClientResponse {
@ -97,22 +98,24 @@ class DefaultClientResponse implements ClientResponse {
@Override
public <T> Mono<T> bodyToMono(Class<? extends T> elementClass) {
return body(BodyExtractors.toMono(elementClass));
Mono<T> body = body(BodyExtractors.toMono(elementClass));
return body.doOnTerminate(this.response::close);
}
@Override
public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> typeReference) {
return body(BodyExtractors.toMono(typeReference));
return body(BodyExtractors.toMono(typeReference)).doOnTerminate(this.response::close);
}
@Override
public <T> Flux<T> bodyToFlux(Class<? extends T> elementClass) {
return body(BodyExtractors.toFlux(elementClass));
Flux<T> body = body(BodyExtractors.toFlux(elementClass));
return body.doOnTerminate(this.response::close);
}
@Override
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> typeReference) {
return body(BodyExtractors.toFlux(typeReference));
return body(BodyExtractors.toFlux(typeReference)).doOnTerminate(this.response::close);
}
@Override
@ -131,7 +134,8 @@ class DefaultClientResponse implements ClientResponse {
return bodyMono
.map(body -> new ResponseEntity<>(body, headers, statusCode))
.switchIfEmpty(Mono.defer(
() -> Mono.just(new ResponseEntity<>(headers, statusCode))));
() -> Mono.just(new ResponseEntity<>(headers, statusCode))))
.doOnTerminate(this.response::close);
}
@Override
@ -150,9 +154,14 @@ class DefaultClientResponse implements ClientResponse {
HttpStatus statusCode = statusCode();
return bodyFlux
.collectList()
.map(body -> new ResponseEntity<>(body, headers, statusCode));
.map(body -> new ResponseEntity<>(body, headers, statusCode))
.doOnTerminate(this.response::close);
}
@Override
public void close() {
this.response.close();
}
private class DefaultHeaders implements Headers {

View File

@ -413,7 +413,7 @@ class DefaultWebClient implements WebClient {
@SuppressWarnings("unchecked")
public <T> Mono<T> bodyToMono(Class<T> bodyType) {
return this.responseMono.flatMap(
response -> bodyToPublisher(response, BodyExtractors.toMono(bodyType),
response -> bodyToMono(response, BodyExtractors.toMono(bodyType),
this::monoThrowableToMono));
}
@ -421,7 +421,7 @@ class DefaultWebClient implements WebClient {
@SuppressWarnings("unchecked")
public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> typeReference) {
return this.responseMono.flatMap(
response -> bodyToPublisher(response, BodyExtractors.toMono(typeReference),
response -> bodyToMono(response, BodyExtractors.toMono(typeReference),
mono -> (Mono<T>)mono));
}
@ -429,17 +429,30 @@ class DefaultWebClient implements WebClient {
return mono.flatMap(Mono::error);
}
private <T> Mono<T> bodyToMono(ClientResponse response,
BodyExtractor<Mono<T>, ? super ClientHttpResponse> extractor,
Function<Mono<? extends Throwable>, Mono<T>> errorFunction) {
return this.statusHandlers.stream()
.filter(statusHandler -> statusHandler.test(response.statusCode()))
.findFirst()
.map(statusHandler -> statusHandler.apply(response))
.map(errorFunction::apply)
.orElse(response.body(extractor))
.doAfterTerminate(response::close);
}
@Override
public <T> Flux<T> bodyToFlux(Class<T> elementType) {
return this.responseMono.flatMapMany(
response -> bodyToPublisher(response, BodyExtractors.toFlux(elementType),
response -> bodyToFlux(response, BodyExtractors.toFlux(elementType),
this::monoThrowableToFlux));
}
@Override
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> typeReference) {
return this.responseMono.flatMapMany(
response -> bodyToPublisher(response, BodyExtractors.toFlux(typeReference),
response -> bodyToFlux(response, BodyExtractors.toFlux(typeReference),
this::monoThrowableToFlux));
}
@ -447,16 +460,17 @@ class DefaultWebClient implements WebClient {
return mono.flatMapMany(Flux::error);
}
private <T extends Publisher<?>> T bodyToPublisher(ClientResponse response,
BodyExtractor<T, ? super ClientHttpResponse> extractor,
Function<Mono<? extends Throwable>, T> errorFunction) {
private <T> Flux<T> bodyToFlux(ClientResponse response,
BodyExtractor<Flux<T>, ? super ClientHttpResponse> extractor,
Function<Mono<? extends Throwable>, Flux<T>> errorFunction) {
return this.statusHandlers.stream()
.filter(statusHandler -> statusHandler.test(response.statusCode()))
.findFirst()
.map(statusHandler -> statusHandler.apply(response))
.map(errorFunction::apply)
.orElse(response.body(extractor));
.orElse(response.body(extractor))
.doAfterTerminate(response::close);
}
private static Mono<WebClientResponseException> createResponseException(ClientResponse response) {

View File

@ -461,6 +461,17 @@ public interface WebClient {
* .exchange()
* .flatMapMany(response -> response.bodyToFlux(Pojo.class));
* </pre>
* <p>If the response body is not consumed with {@code bodyTo*}
* or {@code toEntity*} methods, it is your responsibility
* to release the HTTP resources with {@link ClientResponse#close()}.
* <pre>
* Mono&lt;HttpStatus&gt; mono = client.get().uri("/")
* .exchange()
* .map(response -> {
* response.close();
* return response.statusCode();
* });
* </pre>
* @return a {@code Mono} with the response
* @see #retrieve()
*/

View File

@ -70,6 +70,12 @@ public class WebClientIntegrationTests {
public void headers() throws Exception {
this.server.enqueue(new MockResponse().setHeader("Content-Type", "text/plain").setBody("Hello Spring!"));
this.webClient.get().uri("/test")
.exchange()
.map(response -> {
response.close();
return response.statusCode();
});
Mono<HttpHeaders> result = this.webClient.get()
.uri("/greeting?name=Spring")
.exchange()

View File

@ -0,0 +1,116 @@
package org.springframework.web.reactive.function.client;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.mock.http.client.reactive.test.MockClientHttpResponse;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
/**
* Mock tests using a {@link ExchangeFunction} through {@link WebClient}.
*
* @author Brian Clozel
*/
public class WebClientMockTests {
private MockClientHttpResponse response;
private ClientHttpConnector mockConnector;
private WebClient webClient;
@Before
public void setUp() throws Exception {
this.mockConnector = mock(ClientHttpConnector.class);
this.webClient = WebClient.builder().clientConnector(this.mockConnector).build();
this.response = new MockClientHttpResponse(HttpStatus.OK);
this.response.setBody("example");
given(this.mockConnector.connect(any(), any(), any())).willReturn(Mono.just(this.response));
}
@Test
public void shouldDisposeResponseManually() {
Mono<HttpHeaders> headers= this.webClient
.get().uri("/test")
.exchange()
.map(response -> response.headers().asHttpHeaders());
StepVerifier.create(headers)
.expectNextCount(1)
.verifyComplete();
assertFalse(this.response.isClosed());
}
@Test
public void shouldDisposeResponseExchangeMono() {
Mono<String> body = this.webClient
.get().uri("/test")
.exchange()
.flatMap(response -> response.bodyToMono(String.class));
StepVerifier.create(body)
.expectNext("example")
.verifyComplete();
assertTrue(this.response.isClosed());
}
@Test
public void shouldDisposeResponseExchangeFlux() {
Flux<String> body = this.webClient
.get().uri("/test")
.exchange()
.flatMapMany(response -> response.bodyToFlux(String.class));
StepVerifier.create(body)
.expectNext("example")
.verifyComplete();
assertTrue(this.response.isClosed());
}
@Test
public void shouldDisposeResponseExchangeEntity() {
ResponseEntity<String> entity = this.webClient
.get().uri("/test")
.exchange()
.flatMap(response -> response.toEntity(String.class))
.block();
assertEquals("example", entity.getBody());
assertTrue(this.response.isClosed());
}
@Test
public void shouldDisposeResponseRetrieveMono() {
Mono<String> body = this.webClient
.get().uri("/test")
.retrieve()
.bodyToMono(String.class);
StepVerifier.create(body)
.expectNext("example")
.verifyComplete();
assertTrue(this.response.isClosed());
}
@Test
public void shouldDisposeResponseRetrieveFlux() {
Flux<String> body = this.webClient
.get().uri("/test")
.retrieve()
.bodyToFlux(String.class);
StepVerifier.create(body)
.expectNext("example")
.verifyComplete();
assertTrue(this.response.isClosed());
}
}