Defensive checks in WebClient and Reactor connector
Since there is no reason for an exchange to ever complete without a ClientResponse I've added a switchIfEmpty check at the WebClient level. Also, temporarily a second check closer to the problem in the ReactorClientHttpConnector suggesting a workaround and providing a reference to the Reactor Netty issue #138. Issue: SPR-15784
This commit is contained in:
parent
56903581d9
commit
43f2de4671
|
|
@ -23,6 +23,8 @@ import java.util.function.Function;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
import reactor.ipc.netty.http.client.HttpClient;
|
import reactor.ipc.netty.http.client.HttpClient;
|
||||||
import reactor.ipc.netty.http.client.HttpClientOptions;
|
import reactor.ipc.netty.http.client.HttpClientOptions;
|
||||||
|
import reactor.ipc.netty.http.client.HttpClientRequest;
|
||||||
|
import reactor.ipc.netty.http.client.HttpClientResponse;
|
||||||
import reactor.ipc.netty.options.ClientOptions;
|
import reactor.ipc.netty.options.ClientOptions;
|
||||||
|
|
||||||
import org.springframework.http.HttpMethod;
|
import org.springframework.http.HttpMethod;
|
||||||
|
|
@ -36,6 +38,12 @@ import org.springframework.http.HttpMethod;
|
||||||
*/
|
*/
|
||||||
public class ReactorClientHttpConnector implements ClientHttpConnector {
|
public class ReactorClientHttpConnector implements ClientHttpConnector {
|
||||||
|
|
||||||
|
private static final Mono<ClientHttpResponse> NO_CLIENT_RESPONSE_ERROR = Mono.error(
|
||||||
|
new IllegalStateException("HttpClient completed without a response. " +
|
||||||
|
"As a temporary workaround try to disable connection pool. " +
|
||||||
|
"See https://github.com/reactor/reactor-netty/issues/138."));
|
||||||
|
|
||||||
|
|
||||||
private final HttpClient httpClient;
|
private final HttpClient httpClient;
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -61,11 +69,23 @@ public class ReactorClientHttpConnector implements ClientHttpConnector {
|
||||||
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
|
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
|
||||||
|
|
||||||
return this.httpClient
|
return this.httpClient
|
||||||
.request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name()),
|
.request(adaptHttpMethod(method),
|
||||||
uri.toString(),
|
uri.toString(),
|
||||||
httpClientRequest -> requestCallback
|
request -> requestCallback.apply(adaptRequest(method, uri, request)))
|
||||||
.apply(new ReactorClientHttpRequest(method, uri, httpClientRequest)))
|
.map(this::adaptResponse)
|
||||||
.map(ReactorClientHttpResponse::new);
|
.switchIfEmpty(NO_CLIENT_RESPONSE_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
private io.netty.handler.codec.http.HttpMethod adaptHttpMethod(HttpMethod method) {
|
||||||
|
return io.netty.handler.codec.http.HttpMethod.valueOf(method.name());
|
||||||
|
}
|
||||||
|
|
||||||
|
private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request) {
|
||||||
|
return new ReactorClientHttpRequest(method, uri, request);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ClientHttpResponse adaptResponse(HttpClientResponse response) {
|
||||||
|
return new ReactorClientHttpResponse(response);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -63,6 +63,10 @@ import org.springframework.web.util.UriBuilderFactory;
|
||||||
*/
|
*/
|
||||||
class DefaultWebClient implements WebClient {
|
class DefaultWebClient implements WebClient {
|
||||||
|
|
||||||
|
private static final Mono<ClientResponse> NO_HTTP_CLIENT_RESPONSE_ERROR = Mono.error(
|
||||||
|
new IllegalStateException("The underlying HTTP client completed without emitting a response."));
|
||||||
|
|
||||||
|
|
||||||
private final ExchangeFunction exchangeFunction;
|
private final ExchangeFunction exchangeFunction;
|
||||||
|
|
||||||
private final UriBuilderFactory uriBuilderFactory;
|
private final UriBuilderFactory uriBuilderFactory;
|
||||||
|
|
@ -309,7 +313,7 @@ class DefaultWebClient implements WebClient {
|
||||||
ClientRequest request = (this.inserter != null ?
|
ClientRequest request = (this.inserter != null ?
|
||||||
initRequestBuilder().body(this.inserter).build() :
|
initRequestBuilder().body(this.inserter).build() :
|
||||||
initRequestBuilder().build());
|
initRequestBuilder().build());
|
||||||
return exchangeFunction.exchange(request);
|
return exchangeFunction.exchange(request).switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ClientRequest.Builder initRequestBuilder() {
|
private ClientRequest.Builder initRequestBuilder() {
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@
|
||||||
|
|
||||||
package org.springframework.web.reactive.function.client;
|
package org.springframework.web.reactive.function.client;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
@ -25,12 +26,15 @@ import org.mockito.Captor;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
import org.mockito.MockitoAnnotations;
|
import org.mockito.MockitoAnnotations;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
import reactor.test.StepVerifier;
|
||||||
|
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.MediaType;
|
import org.springframework.http.MediaType;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.mockito.Mockito.*;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit tests for {@link DefaultWebClient}.
|
* Unit tests for {@link DefaultWebClient}.
|
||||||
|
|
@ -160,7 +164,7 @@ public class DefaultWebClientTests {
|
||||||
@Test
|
@Test
|
||||||
public void apply() {
|
public void apply() {
|
||||||
WebClient client = builder()
|
WebClient client = builder()
|
||||||
.apply(builder -> builder.defaultHeader("Accept", "application/json").defaultCookie("id", "123"))
|
.apply(builder -> builder.defaultHeader("Accept", "application/json").defaultCookie("id", "123"))
|
||||||
.build();
|
.build();
|
||||||
client.get().uri("/path").exchange();
|
client.get().uri("/path").exchange();
|
||||||
|
|
||||||
|
|
@ -170,6 +174,12 @@ public class DefaultWebClientTests {
|
||||||
verifyNoMoreInteractions(this.exchangeFunction);
|
verifyNoMoreInteractions(this.exchangeFunction);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void switchToErrorOnEmptyClientResponseMono() throws Exception {
|
||||||
|
StepVerifier.create(builder().build().get().uri("/path").exchange())
|
||||||
|
.expectErrorMessage("The underlying HTTP client completed without emitting a response.")
|
||||||
|
.verify(Duration.ofSeconds(5));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private WebClient.Builder builder() {
|
private WebClient.Builder builder() {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue