Revise and align Reactor client lifecycle management

Closes gh-32945
This commit is contained in:
Juergen Hoeller 2024-06-05 16:32:40 +02:00
parent 4f6f2c0d41
commit 7785f94c4c
5 changed files with 133 additions and 121 deletions

View File

@ -50,19 +50,21 @@ public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactor
private static final Function<HttpClient, HttpClient> defaultInitializer = client -> client.compress(true);
private HttpClient httpClient;
@Nullable
private final ReactorResourceFactory resourceFactory;
@Nullable
private final Function<HttpClient, HttpClient> mapper;
private Duration exchangeTimeout = Duration.ofSeconds(5);
@Nullable
private Integer connectTimeout;
private Duration readTimeout = Duration.ofSeconds(10);
private volatile boolean running = true;
private Duration exchangeTimeout = Duration.ofSeconds(5);
@Nullable
private volatile HttpClient httpClient;
private final Object lifecycleMonitor = new Object();
@ -107,25 +109,11 @@ public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactor
* @param mapper a mapper for further initialization of the created client
*/
public ReactorNettyClientRequestFactory(ReactorResourceFactory resourceFactory, Function<HttpClient, HttpClient> mapper) {
this.httpClient = createHttpClient(resourceFactory, mapper);
this.resourceFactory = resourceFactory;
this.mapper = mapper;
}
private static HttpClient createHttpClient(ReactorResourceFactory resourceFactory, Function<HttpClient, HttpClient> mapper) {
ConnectionProvider provider = resourceFactory.getConnectionProvider();
Assert.notNull(provider, "No ConnectionProvider: is ReactorResourceFactory not initialized yet?");
return defaultInitializer.andThen(mapper).andThen(applyLoopResources(resourceFactory))
.apply(HttpClient.create(provider));
}
private static Function<HttpClient, HttpClient> applyLoopResources(ReactorResourceFactory factory) {
return httpClient -> {
LoopResources resources = factory.getLoopResources();
Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?");
return httpClient.runOn(resources);
};
if (resourceFactory.isRunning()) {
this.httpClient = createHttpClient(resourceFactory, mapper);
}
}
@ -138,7 +126,11 @@ public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactor
*/
public void setConnectTimeout(int connectTimeout) {
Assert.isTrue(connectTimeout >= 0, "Timeout must be a non-negative value");
this.httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
this.connectTimeout = connectTimeout;
HttpClient httpClient = this.httpClient;
if (httpClient != null) {
this.httpClient = httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectTimeout);
}
}
/**
@ -150,8 +142,7 @@ public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactor
*/
public void setConnectTimeout(Duration connectTimeout) {
Assert.notNull(connectTimeout, "ConnectTimeout must not be null");
Assert.isTrue(!connectTimeout.isNegative(), "Timeout must be a non-negative value");
this.httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int)connectTimeout.toMillis());
setConnectTimeout((int) connectTimeout.toMillis());
}
/**
@ -192,52 +183,55 @@ public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactor
this.exchangeTimeout = exchangeTimeout;
}
private HttpClient createHttpClient(ReactorResourceFactory factory, Function<HttpClient, HttpClient> mapper) {
HttpClient httpClient = defaultInitializer.andThen(mapper)
.apply(HttpClient.create(factory.getConnectionProvider()));
httpClient = httpClient.runOn(factory.getLoopResources());
if (this.connectTimeout != null) {
httpClient = httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectTimeout);
}
return httpClient;
}
@Override
public ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) throws IOException {
return new ReactorNettyClientRequest(this.httpClient, uri, httpMethod, this.exchangeTimeout, this.readTimeout);
HttpClient httpClient = this.httpClient;
if (httpClient == null) {
Assert.state(this.resourceFactory != null && this.mapper != null, "Illegal configuration");
httpClient = createHttpClient(this.resourceFactory, this.mapper);
}
return new ReactorNettyClientRequest(httpClient, uri, httpMethod, this.exchangeTimeout, this.readTimeout);
}
@Override
public void start() {
synchronized (this.lifecycleMonitor) {
if (!isRunning()) {
if (this.resourceFactory != null && this.mapper != null) {
if (this.resourceFactory != null && this.mapper != null) {
synchronized (this.lifecycleMonitor) {
if (this.httpClient == null) {
this.httpClient = createHttpClient(this.resourceFactory, this.mapper);
}
else {
logger.warn("Restarting a ReactorNettyClientRequestFactory bean is only supported with externally managed Reactor Netty resources");
}
this.running = true;
}
}
else {
logger.warn("Restarting a ReactorNettyClientRequestFactory bean is only supported " +
"with externally managed Reactor Netty resources");
}
}
@Override
public void stop() {
synchronized (this.lifecycleMonitor) {
if (isRunning()) {
this.running = false;
if (this.resourceFactory != null && this.mapper != null) {
synchronized (this.lifecycleMonitor) {
this.httpClient = null;
}
}
}
@Override
public final void stop(Runnable callback) {
synchronized (this.lifecycleMonitor) {
stop();
callback.run();
}
}
@Override
public boolean isRunning() {
return this.running;
}
@Override
public boolean isAutoStartup() {
return false;
return (this.httpClient != null);
}
@Override

View File

@ -237,7 +237,7 @@ public class ReactorResourceFactory
@Override
public void start() {
synchronized (this.lifecycleMonitor) {
if (!isRunning()) {
if (!this.running) {
if (this.useGlobalResources) {
Assert.isTrue(this.loopResources == null && this.connectionProvider == null,
"'useGlobalResources' is mutually exclusive with explicitly configured resources");
@ -267,7 +267,7 @@ public class ReactorResourceFactory
@Override
public void stop() {
synchronized (this.lifecycleMonitor) {
if (isRunning()) {
if (this.running) {
if (this.useGlobalResources) {
HttpResources.disposeLoopsAndConnectionsLater(this.shutdownQuietPeriod, this.shutdownTimeout).block();
this.connectionProvider = null;
@ -306,4 +306,10 @@ public class ReactorResourceFactory
return this.running;
}
@Override
public int getPhase() {
// Same as plain Lifecycle
return 0;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -54,24 +54,21 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif
private static final Function<HttpClient, HttpClient> defaultInitializer = client -> client.compress(true);
private HttpClient httpClient;
@Nullable
private final ReactorResourceFactory resourceFactory;
@Nullable
private final Function<HttpClient, HttpClient> mapper;
private volatile boolean running = true;
@Nullable
private volatile HttpClient httpClient;
private final Object lifecycleMonitor = new Object();
/**
* Default constructor. Initializes {@link HttpClient} via:
* <pre class="code">
* HttpClient.create().compress()
* </pre>
* <pre class="code">HttpClient.create().compress(true)</pre>
*/
public ReactorClientHttpConnector() {
this.httpClient = defaultInitializer.apply(HttpClient.create());
@ -79,6 +76,18 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif
this.mapper = null;
}
/**
* Constructor with a pre-configured {@code HttpClient} instance.
* @param httpClient the client to use
* @since 5.1
*/
public ReactorClientHttpConnector(HttpClient httpClient) {
Assert.notNull(httpClient, "HttpClient is required");
this.httpClient = httpClient;
this.resourceFactory = null;
this.mapper = null;
}
/**
* Constructor with externally managed Reactor Netty resources, including
* {@link LoopResources} for event loop threads, and {@link ConnectionProvider}
@ -98,37 +107,16 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif
* @since 5.1
*/
public ReactorClientHttpConnector(ReactorResourceFactory resourceFactory, Function<HttpClient, HttpClient> mapper) {
this.httpClient = createHttpClient(resourceFactory, mapper);
this.resourceFactory = resourceFactory;
this.mapper = mapper;
if (resourceFactory.isRunning()) {
this.httpClient = createHttpClient(resourceFactory, mapper);
}
}
private static HttpClient createHttpClient(ReactorResourceFactory resourceFactory, Function<HttpClient, HttpClient> mapper) {
ConnectionProvider provider = resourceFactory.getConnectionProvider();
Assert.notNull(provider, "No ConnectionProvider: is ReactorResourceFactory not initialized yet?");
return defaultInitializer.andThen(mapper).andThen(applyLoopResources(resourceFactory))
.apply(HttpClient.create(provider));
}
private static Function<HttpClient, HttpClient> applyLoopResources(ReactorResourceFactory factory) {
return httpClient -> {
LoopResources resources = factory.getLoopResources();
Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?");
return httpClient.runOn(resources);
};
}
/**
* Constructor with a pre-configured {@code HttpClient} instance.
* @param httpClient the client to use
* @since 5.1
*/
public ReactorClientHttpConnector(HttpClient httpClient) {
Assert.notNull(httpClient, "HttpClient is required");
this.httpClient = httpClient;
this.resourceFactory = null;
this.mapper = null;
private static HttpClient createHttpClient(ReactorResourceFactory factory, Function<HttpClient, HttpClient> mapper) {
return defaultInitializer.andThen(mapper).andThen(httpClient -> httpClient.runOn(factory.getLoopResources()))
.apply(HttpClient.create(factory.getConnectionProvider()));
}
@ -136,12 +124,17 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
AtomicReference<ReactorClientHttpResponse> responseRef = new AtomicReference<>();
HttpClient httpClient = this.httpClient;
if (httpClient == null) {
Assert.state(this.resourceFactory != null && this.mapper != null, "Illegal configuration");
httpClient = createHttpClient(this.resourceFactory, this.mapper);
}
HttpClient.RequestSender requestSender = this.httpClient
HttpClient.RequestSender requestSender = httpClient
.request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name()));
requestSender = setUri(requestSender, uri);
AtomicReference<ReactorClientHttpResponse> responseRef = new AtomicReference<>();
return requestSender
.send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound)))
@ -176,46 +169,34 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif
return new ReactorClientHttpRequest(method, uri, request, nettyOutbound);
}
@Override
public void start() {
synchronized (this.lifecycleMonitor) {
if (!isRunning()) {
if (this.resourceFactory != null && this.mapper != null) {
if (this.resourceFactory != null && this.mapper != null) {
synchronized (this.lifecycleMonitor) {
if (this.httpClient == null) {
this.httpClient = createHttpClient(this.resourceFactory, this.mapper);
}
else {
logger.warn("Restarting a ReactorClientHttpConnector bean is only supported with externally managed Reactor Netty resources");
}
this.running = true;
}
}
else {
logger.warn("Restarting a ReactorClientHttpConnector bean is only supported " +
"with externally managed Reactor Netty resources");
}
}
@Override
public void stop() {
synchronized (this.lifecycleMonitor) {
if (isRunning()) {
this.running = false;
if (this.resourceFactory != null && this.mapper != null) {
synchronized (this.lifecycleMonitor) {
this.httpClient = null;
}
}
}
@Override
public final void stop(Runnable callback) {
synchronized (this.lifecycleMonitor) {
stop();
callback.run();
}
}
@Override
public boolean isRunning() {
return this.running;
}
@Override
public boolean isAutoStartup() {
return false;
return (this.httpClient != null);
}
@Override

View File

@ -51,7 +51,20 @@ class ReactorNettyClientRequestFactoryTests extends AbstractHttpRequestFactoryTe
requestFactory.start();
assertThat(requestFactory.isRunning()).isTrue();
requestFactory.stop();
assertThat(requestFactory.isRunning()).isFalse();
assertThat(requestFactory.isRunning()).isTrue();
requestFactory.start();
assertThat(requestFactory.isRunning()).isTrue();
}
@Test
void restartWithHttpClient() {
HttpClient httpClient = HttpClient.create();
ReactorNettyClientRequestFactory requestFactory = new ReactorNettyClientRequestFactory(httpClient);
assertThat(requestFactory.isRunning()).isTrue();
requestFactory.start();
assertThat(requestFactory.isRunning()).isTrue();
requestFactory.stop();
assertThat(requestFactory.isRunning()).isTrue();
requestFactory.start();
assertThat(requestFactory.isRunning()).isTrue();
}
@ -72,10 +85,12 @@ class ReactorNettyClientRequestFactoryTests extends AbstractHttpRequestFactoryTe
}
@Test
void restartWithHttpClient() {
HttpClient httpClient = HttpClient.create();
ReactorNettyClientRequestFactory requestFactory = new ReactorNettyClientRequestFactory(httpClient);
assertThat(requestFactory.isRunning()).isTrue();
void lateStartWithExternalResourceFactory() {
ReactorResourceFactory resourceFactory = new ReactorResourceFactory();
Function<HttpClient, HttpClient> mapper = Function.identity();
ReactorNettyClientRequestFactory requestFactory = new ReactorNettyClientRequestFactory(resourceFactory, mapper);
assertThat(requestFactory.isRunning()).isFalse();
resourceFactory.start();
requestFactory.start();
assertThat(requestFactory.isRunning()).isTrue();
requestFactory.stop();

View File

@ -27,6 +27,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Sebastien Deleuze
* @since 6.1
*/
class ReactorClientHttpConnectorTests {
@ -37,7 +38,20 @@ class ReactorClientHttpConnectorTests {
connector.start();
assertThat(connector.isRunning()).isTrue();
connector.stop();
assertThat(connector.isRunning()).isFalse();
assertThat(connector.isRunning()).isTrue();
connector.start();
assertThat(connector.isRunning()).isTrue();
}
@Test
void restartWithHttpClient() {
HttpClient httpClient = HttpClient.create();
ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
assertThat(connector.isRunning()).isTrue();
connector.start();
assertThat(connector.isRunning()).isTrue();
connector.stop();
assertThat(connector.isRunning()).isTrue();
connector.start();
assertThat(connector.isRunning()).isTrue();
}
@ -58,10 +72,12 @@ class ReactorClientHttpConnectorTests {
}
@Test
void restartWithHttpClient() {
HttpClient httpClient = HttpClient.create();
ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
assertThat(connector.isRunning()).isTrue();
void lateStartWithExternalResourceFactory() {
ReactorResourceFactory resourceFactory = new ReactorResourceFactory();
Function<HttpClient, HttpClient> mapper = Function.identity();
ReactorClientHttpConnector connector = new ReactorClientHttpConnector(resourceFactory, mapper);
assertThat(connector.isRunning()).isFalse();
resourceFactory.start();
connector.start();
assertThat(connector.isRunning()).isTrue();
connector.stop();