From 7785f94c4c2b7a35d80c28f197c87c54fffc6564 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Wed, 5 Jun 2024 16:32:40 +0200 Subject: [PATCH] Revise and align Reactor client lifecycle management Closes gh-32945 --- .../ReactorNettyClientRequestFactory.java | 92 ++++++++-------- .../http/client/ReactorResourceFactory.java | 10 +- .../reactive/ReactorClientHttpConnector.java | 101 +++++++----------- ...ReactorNettyClientRequestFactoryTests.java | 25 ++++- .../ReactorClientHttpConnectorTests.java | 26 ++++- 5 files changed, 133 insertions(+), 121 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientRequestFactory.java b/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientRequestFactory.java index af29f7bce15..0a453d4f1a0 100644 --- a/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientRequestFactory.java +++ b/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientRequestFactory.java @@ -50,19 +50,21 @@ public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactor private static final Function defaultInitializer = client -> client.compress(true); - private HttpClient httpClient; - @Nullable private final ReactorResourceFactory resourceFactory; @Nullable private final Function mapper; - private Duration exchangeTimeout = Duration.ofSeconds(5); + @Nullable + private Integer connectTimeout; private Duration readTimeout = Duration.ofSeconds(10); - private volatile boolean running = true; + private Duration exchangeTimeout = Duration.ofSeconds(5); + + @Nullable + private volatile HttpClient httpClient; private final Object lifecycleMonitor = new Object(); @@ -107,25 +109,11 @@ public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactor * @param mapper a mapper for further initialization of the created client */ public ReactorNettyClientRequestFactory(ReactorResourceFactory resourceFactory, Function mapper) { - this.httpClient = createHttpClient(resourceFactory, mapper); this.resourceFactory = resourceFactory; this.mapper = mapper; - } - - - private static HttpClient createHttpClient(ReactorResourceFactory resourceFactory, Function mapper) { - ConnectionProvider provider = resourceFactory.getConnectionProvider(); - Assert.notNull(provider, "No ConnectionProvider: is ReactorResourceFactory not initialized yet?"); - return defaultInitializer.andThen(mapper).andThen(applyLoopResources(resourceFactory)) - .apply(HttpClient.create(provider)); - } - - private static Function applyLoopResources(ReactorResourceFactory factory) { - return httpClient -> { - LoopResources resources = factory.getLoopResources(); - Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?"); - return httpClient.runOn(resources); - }; + if (resourceFactory.isRunning()) { + this.httpClient = createHttpClient(resourceFactory, mapper); + } } @@ -138,7 +126,11 @@ public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactor */ public void setConnectTimeout(int connectTimeout) { Assert.isTrue(connectTimeout >= 0, "Timeout must be a non-negative value"); - this.httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout); + this.connectTimeout = connectTimeout; + HttpClient httpClient = this.httpClient; + if (httpClient != null) { + this.httpClient = httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectTimeout); + } } /** @@ -150,8 +142,7 @@ public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactor */ public void setConnectTimeout(Duration connectTimeout) { Assert.notNull(connectTimeout, "ConnectTimeout must not be null"); - Assert.isTrue(!connectTimeout.isNegative(), "Timeout must be a non-negative value"); - this.httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int)connectTimeout.toMillis()); + setConnectTimeout((int) connectTimeout.toMillis()); } /** @@ -192,52 +183,55 @@ public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactor this.exchangeTimeout = exchangeTimeout; } + private HttpClient createHttpClient(ReactorResourceFactory factory, Function mapper) { + HttpClient httpClient = defaultInitializer.andThen(mapper) + .apply(HttpClient.create(factory.getConnectionProvider())); + httpClient = httpClient.runOn(factory.getLoopResources()); + if (this.connectTimeout != null) { + httpClient = httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectTimeout); + } + return httpClient; + } + @Override public ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) throws IOException { - return new ReactorNettyClientRequest(this.httpClient, uri, httpMethod, this.exchangeTimeout, this.readTimeout); + HttpClient httpClient = this.httpClient; + if (httpClient == null) { + Assert.state(this.resourceFactory != null && this.mapper != null, "Illegal configuration"); + httpClient = createHttpClient(this.resourceFactory, this.mapper); + } + return new ReactorNettyClientRequest(httpClient, uri, httpMethod, this.exchangeTimeout, this.readTimeout); } + @Override public void start() { - synchronized (this.lifecycleMonitor) { - if (!isRunning()) { - if (this.resourceFactory != null && this.mapper != null) { + if (this.resourceFactory != null && this.mapper != null) { + synchronized (this.lifecycleMonitor) { + if (this.httpClient == null) { this.httpClient = createHttpClient(this.resourceFactory, this.mapper); } - else { - logger.warn("Restarting a ReactorNettyClientRequestFactory bean is only supported with externally managed Reactor Netty resources"); - } - this.running = true; } } + else { + logger.warn("Restarting a ReactorNettyClientRequestFactory bean is only supported " + + "with externally managed Reactor Netty resources"); + } } @Override public void stop() { - synchronized (this.lifecycleMonitor) { - if (isRunning()) { - this.running = false; + if (this.resourceFactory != null && this.mapper != null) { + synchronized (this.lifecycleMonitor) { + this.httpClient = null; } } } - @Override - public final void stop(Runnable callback) { - synchronized (this.lifecycleMonitor) { - stop(); - callback.run(); - } - } - @Override public boolean isRunning() { - return this.running; - } - - @Override - public boolean isAutoStartup() { - return false; + return (this.httpClient != null); } @Override diff --git a/spring-web/src/main/java/org/springframework/http/client/ReactorResourceFactory.java b/spring-web/src/main/java/org/springframework/http/client/ReactorResourceFactory.java index f6ca7af2f27..faed920cf66 100644 --- a/spring-web/src/main/java/org/springframework/http/client/ReactorResourceFactory.java +++ b/spring-web/src/main/java/org/springframework/http/client/ReactorResourceFactory.java @@ -237,7 +237,7 @@ public class ReactorResourceFactory @Override public void start() { synchronized (this.lifecycleMonitor) { - if (!isRunning()) { + if (!this.running) { if (this.useGlobalResources) { Assert.isTrue(this.loopResources == null && this.connectionProvider == null, "'useGlobalResources' is mutually exclusive with explicitly configured resources"); @@ -267,7 +267,7 @@ public class ReactorResourceFactory @Override public void stop() { synchronized (this.lifecycleMonitor) { - if (isRunning()) { + if (this.running) { if (this.useGlobalResources) { HttpResources.disposeLoopsAndConnectionsLater(this.shutdownQuietPeriod, this.shutdownTimeout).block(); this.connectionProvider = null; @@ -306,4 +306,10 @@ public class ReactorResourceFactory return this.running; } + @Override + public int getPhase() { + // Same as plain Lifecycle + return 0; + } + } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java index 5f1def633ab..b57221e8d06 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -54,24 +54,21 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif private static final Function defaultInitializer = client -> client.compress(true); - private HttpClient httpClient; - @Nullable private final ReactorResourceFactory resourceFactory; @Nullable private final Function mapper; - private volatile boolean running = true; + @Nullable + private volatile HttpClient httpClient; private final Object lifecycleMonitor = new Object(); /** * Default constructor. Initializes {@link HttpClient} via: - *
-	 * HttpClient.create().compress()
-	 * 
+ *
HttpClient.create().compress(true)
*/ public ReactorClientHttpConnector() { this.httpClient = defaultInitializer.apply(HttpClient.create()); @@ -79,6 +76,18 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif this.mapper = null; } + /** + * Constructor with a pre-configured {@code HttpClient} instance. + * @param httpClient the client to use + * @since 5.1 + */ + public ReactorClientHttpConnector(HttpClient httpClient) { + Assert.notNull(httpClient, "HttpClient is required"); + this.httpClient = httpClient; + this.resourceFactory = null; + this.mapper = null; + } + /** * Constructor with externally managed Reactor Netty resources, including * {@link LoopResources} for event loop threads, and {@link ConnectionProvider} @@ -98,37 +107,16 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif * @since 5.1 */ public ReactorClientHttpConnector(ReactorResourceFactory resourceFactory, Function mapper) { - this.httpClient = createHttpClient(resourceFactory, mapper); this.resourceFactory = resourceFactory; this.mapper = mapper; + if (resourceFactory.isRunning()) { + this.httpClient = createHttpClient(resourceFactory, mapper); + } } - private static HttpClient createHttpClient(ReactorResourceFactory resourceFactory, Function mapper) { - ConnectionProvider provider = resourceFactory.getConnectionProvider(); - Assert.notNull(provider, "No ConnectionProvider: is ReactorResourceFactory not initialized yet?"); - return defaultInitializer.andThen(mapper).andThen(applyLoopResources(resourceFactory)) - .apply(HttpClient.create(provider)); - } - - private static Function applyLoopResources(ReactorResourceFactory factory) { - return httpClient -> { - LoopResources resources = factory.getLoopResources(); - Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?"); - return httpClient.runOn(resources); - }; - } - - - /** - * Constructor with a pre-configured {@code HttpClient} instance. - * @param httpClient the client to use - * @since 5.1 - */ - public ReactorClientHttpConnector(HttpClient httpClient) { - Assert.notNull(httpClient, "HttpClient is required"); - this.httpClient = httpClient; - this.resourceFactory = null; - this.mapper = null; + private static HttpClient createHttpClient(ReactorResourceFactory factory, Function mapper) { + return defaultInitializer.andThen(mapper).andThen(httpClient -> httpClient.runOn(factory.getLoopResources())) + .apply(HttpClient.create(factory.getConnectionProvider())); } @@ -136,12 +124,17 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif public Mono connect(HttpMethod method, URI uri, Function> requestCallback) { - AtomicReference responseRef = new AtomicReference<>(); + HttpClient httpClient = this.httpClient; + if (httpClient == null) { + Assert.state(this.resourceFactory != null && this.mapper != null, "Illegal configuration"); + httpClient = createHttpClient(this.resourceFactory, this.mapper); + } - HttpClient.RequestSender requestSender = this.httpClient + HttpClient.RequestSender requestSender = httpClient .request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name())); requestSender = setUri(requestSender, uri); + AtomicReference responseRef = new AtomicReference<>(); return requestSender .send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound))) @@ -176,46 +169,34 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif return new ReactorClientHttpRequest(method, uri, request, nettyOutbound); } + @Override public void start() { - synchronized (this.lifecycleMonitor) { - if (!isRunning()) { - if (this.resourceFactory != null && this.mapper != null) { + if (this.resourceFactory != null && this.mapper != null) { + synchronized (this.lifecycleMonitor) { + if (this.httpClient == null) { this.httpClient = createHttpClient(this.resourceFactory, this.mapper); } - else { - logger.warn("Restarting a ReactorClientHttpConnector bean is only supported with externally managed Reactor Netty resources"); - } - this.running = true; } } + else { + logger.warn("Restarting a ReactorClientHttpConnector bean is only supported " + + "with externally managed Reactor Netty resources"); + } } @Override public void stop() { - synchronized (this.lifecycleMonitor) { - if (isRunning()) { - this.running = false; + if (this.resourceFactory != null && this.mapper != null) { + synchronized (this.lifecycleMonitor) { + this.httpClient = null; } } } - @Override - public final void stop(Runnable callback) { - synchronized (this.lifecycleMonitor) { - stop(); - callback.run(); - } - } - @Override public boolean isRunning() { - return this.running; - } - - @Override - public boolean isAutoStartup() { - return false; + return (this.httpClient != null); } @Override diff --git a/spring-web/src/test/java/org/springframework/http/client/ReactorNettyClientRequestFactoryTests.java b/spring-web/src/test/java/org/springframework/http/client/ReactorNettyClientRequestFactoryTests.java index 3aba01535be..6c3a34b3295 100644 --- a/spring-web/src/test/java/org/springframework/http/client/ReactorNettyClientRequestFactoryTests.java +++ b/spring-web/src/test/java/org/springframework/http/client/ReactorNettyClientRequestFactoryTests.java @@ -51,7 +51,20 @@ class ReactorNettyClientRequestFactoryTests extends AbstractHttpRequestFactoryTe requestFactory.start(); assertThat(requestFactory.isRunning()).isTrue(); requestFactory.stop(); - assertThat(requestFactory.isRunning()).isFalse(); + assertThat(requestFactory.isRunning()).isTrue(); + requestFactory.start(); + assertThat(requestFactory.isRunning()).isTrue(); + } + + @Test + void restartWithHttpClient() { + HttpClient httpClient = HttpClient.create(); + ReactorNettyClientRequestFactory requestFactory = new ReactorNettyClientRequestFactory(httpClient); + assertThat(requestFactory.isRunning()).isTrue(); + requestFactory.start(); + assertThat(requestFactory.isRunning()).isTrue(); + requestFactory.stop(); + assertThat(requestFactory.isRunning()).isTrue(); requestFactory.start(); assertThat(requestFactory.isRunning()).isTrue(); } @@ -72,10 +85,12 @@ class ReactorNettyClientRequestFactoryTests extends AbstractHttpRequestFactoryTe } @Test - void restartWithHttpClient() { - HttpClient httpClient = HttpClient.create(); - ReactorNettyClientRequestFactory requestFactory = new ReactorNettyClientRequestFactory(httpClient); - assertThat(requestFactory.isRunning()).isTrue(); + void lateStartWithExternalResourceFactory() { + ReactorResourceFactory resourceFactory = new ReactorResourceFactory(); + Function mapper = Function.identity(); + ReactorNettyClientRequestFactory requestFactory = new ReactorNettyClientRequestFactory(resourceFactory, mapper); + assertThat(requestFactory.isRunning()).isFalse(); + resourceFactory.start(); requestFactory.start(); assertThat(requestFactory.isRunning()).isTrue(); requestFactory.stop(); diff --git a/spring-web/src/test/java/org/springframework/http/client/reactive/ReactorClientHttpConnectorTests.java b/spring-web/src/test/java/org/springframework/http/client/reactive/ReactorClientHttpConnectorTests.java index 01ef6a1d9f7..bd1579b129c 100644 --- a/spring-web/src/test/java/org/springframework/http/client/reactive/ReactorClientHttpConnectorTests.java +++ b/spring-web/src/test/java/org/springframework/http/client/reactive/ReactorClientHttpConnectorTests.java @@ -27,6 +27,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** * @author Sebastien Deleuze + * @since 6.1 */ class ReactorClientHttpConnectorTests { @@ -37,7 +38,20 @@ class ReactorClientHttpConnectorTests { connector.start(); assertThat(connector.isRunning()).isTrue(); connector.stop(); - assertThat(connector.isRunning()).isFalse(); + assertThat(connector.isRunning()).isTrue(); + connector.start(); + assertThat(connector.isRunning()).isTrue(); + } + + @Test + void restartWithHttpClient() { + HttpClient httpClient = HttpClient.create(); + ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient); + assertThat(connector.isRunning()).isTrue(); + connector.start(); + assertThat(connector.isRunning()).isTrue(); + connector.stop(); + assertThat(connector.isRunning()).isTrue(); connector.start(); assertThat(connector.isRunning()).isTrue(); } @@ -58,10 +72,12 @@ class ReactorClientHttpConnectorTests { } @Test - void restartWithHttpClient() { - HttpClient httpClient = HttpClient.create(); - ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient); - assertThat(connector.isRunning()).isTrue(); + void lateStartWithExternalResourceFactory() { + ReactorResourceFactory resourceFactory = new ReactorResourceFactory(); + Function mapper = Function.identity(); + ReactorClientHttpConnector connector = new ReactorClientHttpConnector(resourceFactory, mapper); + assertThat(connector.isRunning()).isFalse(); + resourceFactory.start(); connector.start(); assertThat(connector.isRunning()).isTrue(); connector.stop();