Refactor ReactorClientHttpRequestFactory timeouts
Closes gh-33782
This commit is contained in:
parent
044da794f4
commit
4749d810db
|
@ -53,19 +53,50 @@ final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest
|
|||
|
||||
private final URI uri;
|
||||
|
||||
@Nullable
|
||||
private final Duration exchangeTimeout;
|
||||
|
||||
private final Duration readTimeout;
|
||||
|
||||
/**
|
||||
* Create an instance.
|
||||
* @param httpClient the client to perform the request with
|
||||
* @param method the HTTP method
|
||||
* @param uri the URI for the request
|
||||
* @since 6.2
|
||||
*/
|
||||
public ReactorClientHttpRequest(HttpClient httpClient, HttpMethod method, URI uri) {
|
||||
this.httpClient = httpClient;
|
||||
this.method = method;
|
||||
this.uri = uri;
|
||||
this.exchangeTimeout = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Package private constructor for use until exchangeTimeout is removed.
|
||||
*/
|
||||
ReactorClientHttpRequest(HttpClient httpClient, HttpMethod method, URI uri, @Nullable Duration exchangeTimeout) {
|
||||
this.httpClient = httpClient;
|
||||
this.method = method;
|
||||
this.uri = uri;
|
||||
this.exchangeTimeout = exchangeTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Original constructor with timeout values.
|
||||
* @deprecated without a replacement; readTimeout is now applied to the
|
||||
* underlying client via {@link HttpClient#responseTimeout(Duration)}, and the
|
||||
* value passed here is not used; exchangeTimeout is deprecated and superseded
|
||||
* by Reactor Netty timeout configuration, but applied if set.
|
||||
*/
|
||||
@Deprecated(since = "6.2", forRemoval = true)
|
||||
public ReactorClientHttpRequest(
|
||||
HttpClient httpClient, URI uri, HttpMethod method, Duration exchangeTimeout, Duration readTimeout) {
|
||||
HttpClient httpClient, URI uri, HttpMethod method,
|
||||
@Nullable Duration exchangeTimeout, @Nullable Duration readTimeout) {
|
||||
|
||||
this.httpClient = httpClient;
|
||||
this.method = method;
|
||||
this.uri = uri;
|
||||
this.exchangeTimeout = exchangeTimeout;
|
||||
this.readTimeout = readTimeout;
|
||||
}
|
||||
|
||||
|
||||
|
@ -89,18 +120,19 @@ final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest
|
|||
sender = (this.uri.isAbsolute() ? sender.uri(this.uri) : sender.uri(this.uri.toString()));
|
||||
|
||||
try {
|
||||
ReactorClientHttpResponse result =
|
||||
Mono<ReactorClientHttpResponse> mono =
|
||||
sender.send((request, outbound) -> send(headers, body, request, outbound))
|
||||
.responseConnection((response, conn) ->
|
||||
Mono.just(new ReactorClientHttpResponse(response, conn, this.readTimeout)))
|
||||
.next()
|
||||
.block(this.exchangeTimeout);
|
||||
.responseConnection((response, conn) -> Mono.just(new ReactorClientHttpResponse(response, conn)))
|
||||
.next();
|
||||
|
||||
if (result == null) {
|
||||
ReactorClientHttpResponse clientResponse =
|
||||
(this.exchangeTimeout != null ? mono.block(this.exchangeTimeout) : mono.block());
|
||||
|
||||
if (clientResponse == null) {
|
||||
throw new IOException("HTTP exchange resulted in no result");
|
||||
}
|
||||
|
||||
return result;
|
||||
return clientResponse;
|
||||
}
|
||||
catch (RuntimeException ex) {
|
||||
throw convertException(ex);
|
||||
|
|
|
@ -48,7 +48,8 @@ public class ReactorClientHttpRequestFactory implements ClientHttpRequestFactory
|
|||
|
||||
private static final Log logger = LogFactory.getLog(ReactorClientHttpRequestFactory.class);
|
||||
|
||||
private static final Function<HttpClient, HttpClient> defaultInitializer = client -> client.compress(true);
|
||||
private static final Function<HttpClient, HttpClient> defaultInitializer =
|
||||
client -> client.compress(true).responseTimeout(Duration.ofSeconds(10));
|
||||
|
||||
|
||||
@Nullable
|
||||
|
@ -60,9 +61,11 @@ public class ReactorClientHttpRequestFactory implements ClientHttpRequestFactory
|
|||
@Nullable
|
||||
private Integer connectTimeout;
|
||||
|
||||
private Duration readTimeout = Duration.ofSeconds(10);
|
||||
@Nullable
|
||||
private Duration readTimeout;
|
||||
|
||||
private Duration exchangeTimeout = Duration.ofSeconds(5);
|
||||
@Nullable
|
||||
private Duration exchangeTimeout;
|
||||
|
||||
@Nullable
|
||||
private volatile HttpClient httpClient;
|
||||
|
@ -120,12 +123,15 @@ public class ReactorClientHttpRequestFactory implements ClientHttpRequestFactory
|
|||
if (this.connectTimeout != null) {
|
||||
client = client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectTimeout);
|
||||
}
|
||||
if (this.readTimeout != null) {
|
||||
client = client.responseTimeout(this.readTimeout);
|
||||
}
|
||||
return client;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Set the underlying connect timeout value on the underlying client.
|
||||
* Set the connect timeout value on the underlying client.
|
||||
* Effectively, a shortcut for
|
||||
* {@code httpClient.option(CONNECT_TIMEOUT_MILLIS, timeout)}.
|
||||
* <p>By default, set to 30 seconds.
|
||||
|
@ -152,35 +158,53 @@ public class ReactorClientHttpRequestFactory implements ClientHttpRequestFactory
|
|||
}
|
||||
|
||||
/**
|
||||
* Set the underlying read timeout in milliseconds.
|
||||
* <p>Default is 10 seconds.
|
||||
* Set the read timeout value on the underlying client.
|
||||
* Effectively, a shortcut for {@link HttpClient#responseTimeout(Duration)}.
|
||||
* <p>By default, set to 10 seconds.
|
||||
* @param timeout the read timeout value in millis; must be > 0.
|
||||
*/
|
||||
public void setReadTimeout(long readTimeout) {
|
||||
Assert.isTrue(readTimeout > 0, "Timeout must be a positive value");
|
||||
this.readTimeout = Duration.ofMillis(readTimeout);
|
||||
public void setReadTimeout(Duration timeout) {
|
||||
Assert.notNull(timeout, "ReadTimeout must not be null");
|
||||
Assert.isTrue(timeout.toMillis() > 0, "Timeout must be a positive value");
|
||||
this.readTimeout = timeout;
|
||||
HttpClient httpClient = this.httpClient;
|
||||
if (httpClient != null) {
|
||||
this.httpClient = httpClient.responseTimeout(timeout);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Variant of {@link #setConnectTimeout(int)} with a {@link Duration} value.
|
||||
* Variant of {@link #setReadTimeout(Duration)} with a long value.
|
||||
*/
|
||||
public void setReadTimeout(Duration readTimeout) {
|
||||
Assert.notNull(readTimeout, "ReadTimeout must not be null");
|
||||
setReadTimeout((int) readTimeout.toMillis());
|
||||
public void setReadTimeout(long readTimeout) {
|
||||
setReadTimeout(Duration.ofMillis(readTimeout));
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the timeout for the HTTP exchange in milliseconds.
|
||||
* <p>Default is 5 seconds.
|
||||
* <p>By default, as of 6.2 this is no longer set.
|
||||
* @see #setConnectTimeout(int)
|
||||
* @see #setReadTimeout(Duration)
|
||||
* @see <a href="https://projectreactor.io/docs/netty/release/reference/index.html#timeout-configuration">Timeout Configuration</a>
|
||||
* @deprecated as of 6.2 and no longer set by default (previously 5 seconds)
|
||||
* in favor of using Reactor Netty HttpClient timeout configuration.
|
||||
*/
|
||||
@Deprecated(since = "6.2", forRemoval = true)
|
||||
public void setExchangeTimeout(long exchangeTimeout) {
|
||||
Assert.isTrue(exchangeTimeout > 0, "Timeout must be a positive value");
|
||||
this.exchangeTimeout = Duration.ofMillis(exchangeTimeout);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the timeout for the HTTP exchange.
|
||||
* <p>Default is 5 seconds.
|
||||
* Variant of {@link #setExchangeTimeout(long)} with a Duration value.
|
||||
* <p>By default, as of 6.2 this is no longer set.
|
||||
* @see #setConnectTimeout(int)
|
||||
* @see #setReadTimeout(Duration)
|
||||
* @see <a href="https://projectreactor.io/docs/netty/release/reference/index.html#timeout-configuration">Timeout Configuration</a>
|
||||
* @deprecated as of 6.2 and no longer set by default (previously 5 seconds)
|
||||
* in favor of using Reactor Netty HttpClient timeout configuration.
|
||||
*/
|
||||
@Deprecated(since = "6.2", forRemoval = true)
|
||||
public void setExchangeTimeout(Duration exchangeTimeout) {
|
||||
Assert.notNull(exchangeTimeout, "ExchangeTimeout must not be null");
|
||||
setExchangeTimeout((int) exchangeTimeout.toMillis());
|
||||
|
@ -195,8 +219,7 @@ public class ReactorClientHttpRequestFactory implements ClientHttpRequestFactory
|
|||
"Expected HttpClient or ResourceFactory and mapper");
|
||||
client = createHttpClient(this.resourceFactory, this.mapper);
|
||||
}
|
||||
return new ReactorClientHttpRequest(
|
||||
client, uri, httpMethod, this.exchangeTimeout, this.readTimeout);
|
||||
return new ReactorClientHttpRequest(client, httpMethod, uri, this.exchangeTimeout);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.time.Duration;
|
|||
import io.netty.buffer.ByteBuf;
|
||||
import org.reactivestreams.FlowAdapters;
|
||||
import reactor.netty.Connection;
|
||||
import reactor.netty.http.client.HttpClient;
|
||||
import reactor.netty.http.client.HttpClientResponse;
|
||||
|
||||
import org.springframework.http.HttpHeaders;
|
||||
|
@ -46,20 +47,36 @@ final class ReactorClientHttpResponse implements ClientHttpResponse {
|
|||
|
||||
private final HttpHeaders headers;
|
||||
|
||||
private final Duration readTimeout;
|
||||
|
||||
@Nullable
|
||||
private volatile InputStream body;
|
||||
|
||||
|
||||
/**
|
||||
* Create a response instance.
|
||||
* @param response the Reactor Netty response
|
||||
* @param connection the connection for the exchange
|
||||
* @since 6.2
|
||||
*/
|
||||
public ReactorClientHttpResponse(HttpClientResponse response, Connection connection) {
|
||||
this.response = response;
|
||||
this.connection = connection;
|
||||
this.headers = HttpHeaders.readOnlyHttpHeaders(
|
||||
new Netty4HeadersAdapter(response.responseHeaders()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Original constructor.
|
||||
* @deprecated without a replacement; readTimeout is now applied to the
|
||||
* underlying client via {@link HttpClient#responseTimeout(Duration)}, and the
|
||||
* value passed here is not used.
|
||||
*/
|
||||
@Deprecated(since = "6.2", forRemoval = true)
|
||||
public ReactorClientHttpResponse(
|
||||
HttpClientResponse response, Connection connection, Duration readTimeout) {
|
||||
HttpClientResponse response, Connection connection, @Nullable Duration readTimeout) {
|
||||
|
||||
this.response = response;
|
||||
this.connection = connection;
|
||||
this.readTimeout = readTimeout;
|
||||
this.headers = HttpHeaders.readOnlyHttpHeaders(
|
||||
new Netty4HeadersAdapter(response.responseHeaders()));
|
||||
this.headers = HttpHeaders.readOnlyHttpHeaders(new Netty4HeadersAdapter(response.responseHeaders()));
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue