diff --git a/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientRequest.java b/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientRequest.java index 6c42becdf77..b399ca27a25 100644 --- a/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientRequest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -101,7 +101,7 @@ final class ReactorNettyClientRequest extends AbstractStreamingClientHttpRequest return result; } } - catch (RuntimeException ex) { // Exceptions.ReactiveException is package private + catch (RuntimeException ex) { // Exceptions.ReactiveException is package private Throwable cause = ex.getCause(); if (cause instanceof UncheckedIOException uioEx) { @@ -111,7 +111,7 @@ final class ReactorNettyClientRequest extends AbstractStreamingClientHttpRequest throw ioEx; } else { - throw ex; + throw new IOException(ex.getMessage(), cause); } } } diff --git a/spring-web/src/main/java/org/springframework/http/client/ReactorResourceFactory.java b/spring-web/src/main/java/org/springframework/http/client/ReactorResourceFactory.java index acc26a52059..f6ca7af2f27 100644 --- a/spring-web/src/main/java/org/springframework/http/client/ReactorResourceFactory.java +++ b/spring-web/src/main/java/org/springframework/http/client/ReactorResourceFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,9 @@ import reactor.netty.resources.LoopResources; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; -import org.springframework.context.Lifecycle; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.SmartLifecycle; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -35,20 +37,21 @@ import org.springframework.util.Assert; * event loop threads, and {@link ConnectionProvider} for the connection pool, * within the lifecycle of a Spring {@code ApplicationContext}. * - *

This factory implements {@link InitializingBean}, {@link DisposableBean} - * and {@link Lifecycle} and is expected typically to be declared as a - * Spring-managed bean. + *

This factory implements {@link SmartLifecycle} and is expected typically + * to be declared as a Spring-managed bean. * - *

Notice that after a {@link Lifecycle} stop/restart, new instances of + *

Notice that after a {@link SmartLifecycle} stop/restart, new instances of * the configured {@link LoopResources} and {@link ConnectionProvider} are * created, so any references to those should be updated. * * @author Rossen Stoyanchev * @author Brian Clozel * @author Sebastien Deleuze + * @author Juergen Hoeller * @since 6.1 */ -public class ReactorResourceFactory implements InitializingBean, DisposableBean, Lifecycle { +public class ReactorResourceFactory + implements ApplicationContextAware, InitializingBean, DisposableBean, SmartLifecycle { private boolean useGlobalResources = true; @@ -73,6 +76,9 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean, private Duration shutdownTimeout = Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_TIMEOUT); + @Nullable + private ApplicationContext applicationContext; + private volatile boolean running; private final Object lifecycleMonitor = new Object(); @@ -202,15 +208,30 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean, this.shutdownTimeout = shutdownTimeout; } + /** + * Setting an {@link ApplicationContext} is optional: If set, Reactor resources + * will be initialized in the {@link #start() lifecycle start} phase and closed + * in the {@link #stop() lifecycle stop} phase. If not set, it will happen in + * {@link #afterPropertiesSet()} and {@link #destroy()}, respectively. + */ + @Override + public void setApplicationContext(ApplicationContext applicationContext) { + this.applicationContext = applicationContext; + } + @Override public void afterPropertiesSet() { - start(); + if (this.applicationContext == null) { + start(); + } } @Override public void destroy() { - stop(); + if (this.applicationContext == null) { + stop(); + } } @Override diff --git a/spring-web/src/test/java/org/springframework/http/client/ReactorNettyClientHttpRequestFactoryTests.java b/spring-web/src/test/java/org/springframework/http/client/ReactorNettyClientRequestFactoryTests.java similarity index 96% rename from spring-web/src/test/java/org/springframework/http/client/ReactorNettyClientHttpRequestFactoryTests.java rename to spring-web/src/test/java/org/springframework/http/client/ReactorNettyClientRequestFactoryTests.java index bff1f3ea353..3aba01535be 100644 --- a/spring-web/src/test/java/org/springframework/http/client/ReactorNettyClientHttpRequestFactoryTests.java +++ b/spring-web/src/test/java/org/springframework/http/client/ReactorNettyClientRequestFactoryTests.java @@ -28,8 +28,9 @@ import static org.assertj.core.api.Assertions.assertThat; /** * @author Arjen Poutsma * @author Sebastien Deleuze + * @since 6.1 */ -class ReactorNettyClientHttpRequestFactoryTests extends AbstractHttpRequestFactoryTests { +class ReactorNettyClientRequestFactoryTests extends AbstractHttpRequestFactoryTests { @Override protected ClientHttpRequestFactory createRequestFactory() { diff --git a/spring-web/src/test/java/org/springframework/http/client/reactive/ReactorResourceFactoryTests.java b/spring-web/src/test/java/org/springframework/http/client/ReactorResourceFactoryTests.java similarity index 75% rename from spring-web/src/test/java/org/springframework/http/client/reactive/ReactorResourceFactoryTests.java rename to spring-web/src/test/java/org/springframework/http/client/ReactorResourceFactoryTests.java index 4fa6220c648..a08c8983a6b 100644 --- a/spring-web/src/test/java/org/springframework/http/client/reactive/ReactorResourceFactoryTests.java +++ b/spring-web/src/test/java/org/springframework/http/client/ReactorResourceFactoryTests.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.http.client.reactive; +package org.springframework.http.client; import java.time.Duration; import java.util.concurrent.atomic.AtomicBoolean; @@ -24,7 +24,7 @@ import reactor.netty.http.HttpResources; import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.LoopResources; -import org.springframework.http.client.ReactorResourceFactory; +import org.springframework.context.support.GenericApplicationContext; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.eq; @@ -37,6 +37,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; * * @author Rossen Stoyanchev * @author Sebastien Deleuze + * @author Juergen Hoeller */ class ReactorResourceFactoryTests { @@ -49,37 +50,34 @@ class ReactorResourceFactoryTests { @Test void globalResources() { - this.resourceFactory.setUseGlobalResources(true); - this.resourceFactory.afterPropertiesSet(); + this.resourceFactory.start(); HttpResources globalResources = HttpResources.get(); assertThat(this.resourceFactory.getConnectionProvider()).isSameAs(globalResources); assertThat(this.resourceFactory.getLoopResources()).isSameAs(globalResources); assertThat(globalResources.isDisposed()).isFalse(); - this.resourceFactory.destroy(); + this.resourceFactory.stop(); assertThat(globalResources.isDisposed()).isTrue(); } @Test void globalResourcesWithConsumer() { - AtomicBoolean invoked = new AtomicBoolean(); this.resourceFactory.addGlobalResourcesConsumer(httpResources -> invoked.set(true)); - this.resourceFactory.afterPropertiesSet(); + this.resourceFactory.start(); assertThat(invoked.get()).isTrue(); - this.resourceFactory.destroy(); + this.resourceFactory.stop(); } @Test void localResources() { - this.resourceFactory.setUseGlobalResources(false); - this.resourceFactory.afterPropertiesSet(); + this.resourceFactory.start(); ConnectionProvider connectionProvider = this.resourceFactory.getConnectionProvider(); LoopResources loopResources = this.resourceFactory.getLoopResources(); @@ -91,7 +89,7 @@ class ReactorResourceFactoryTests { // assertFalse(connectionProvider.isDisposed()); assertThat(loopResources.isDisposed()).isFalse(); - this.resourceFactory.destroy(); + this.resourceFactory.stop(); assertThat(connectionProvider.isDisposed()).isTrue(); assertThat(loopResources.isDisposed()).isTrue(); @@ -99,11 +97,10 @@ class ReactorResourceFactoryTests { @Test void localResourcesViaSupplier() { - this.resourceFactory.setUseGlobalResources(false); this.resourceFactory.setConnectionProviderSupplier(() -> this.connectionProvider); this.resourceFactory.setLoopResourcesSupplier(() -> this.loopResources); - this.resourceFactory.afterPropertiesSet(); + this.resourceFactory.start(); ConnectionProvider connectionProvider = this.resourceFactory.getConnectionProvider(); LoopResources loopResources = this.resourceFactory.getLoopResources(); @@ -113,9 +110,9 @@ class ReactorResourceFactoryTests { verifyNoMoreInteractions(this.connectionProvider, this.loopResources); - this.resourceFactory.destroy(); + this.resourceFactory.stop(); - // Managed (destroy disposes).. + // Managed (stop disposes).. verify(this.connectionProvider).disposeLater(); verify(this.loopResources).disposeLater(eq(Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_QUIET_PERIOD)), eq(Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_TIMEOUT))); verifyNoMoreInteractions(this.connectionProvider, this.loopResources); @@ -130,8 +127,8 @@ class ReactorResourceFactoryTests { this.resourceFactory.setLoopResourcesSupplier(() -> this.loopResources); this.resourceFactory.setShutdownQuietPeriod(quietPeriod); this.resourceFactory.setShutdownTimeout(shutdownTimeout); - this.resourceFactory.afterPropertiesSet(); - this.resourceFactory.destroy(); + this.resourceFactory.start(); + this.resourceFactory.stop(); verify(this.connectionProvider).disposeLater(); verify(this.loopResources).disposeLater(eq(quietPeriod), eq(shutdownTimeout)); @@ -140,11 +137,10 @@ class ReactorResourceFactoryTests { @Test void externalResources() { - this.resourceFactory.setUseGlobalResources(false); this.resourceFactory.setConnectionProvider(this.connectionProvider); this.resourceFactory.setLoopResources(this.loopResources); - this.resourceFactory.afterPropertiesSet(); + this.resourceFactory.start(); ConnectionProvider connectionProvider = this.resourceFactory.getConnectionProvider(); LoopResources loopResources = this.resourceFactory.getLoopResources(); @@ -154,17 +150,16 @@ class ReactorResourceFactoryTests { verifyNoMoreInteractions(this.connectionProvider, this.loopResources); - this.resourceFactory.destroy(); + this.resourceFactory.stop(); - // Not managed (destroy has no impact) + // Not managed (stop has no impact) verifyNoMoreInteractions(this.connectionProvider, this.loopResources); } @Test void restartWithGlobalResources() { - this.resourceFactory.setUseGlobalResources(true); - this.resourceFactory.afterPropertiesSet(); + this.resourceFactory.start(); this.resourceFactory.stop(); this.resourceFactory.start(); @@ -173,16 +168,15 @@ class ReactorResourceFactoryTests { assertThat(this.resourceFactory.getLoopResources()).isSameAs(globalResources); assertThat(globalResources.isDisposed()).isFalse(); - this.resourceFactory.destroy(); + this.resourceFactory.stop(); assertThat(globalResources.isDisposed()).isTrue(); } @Test void restartWithLocalResources() { - this.resourceFactory.setUseGlobalResources(false); - this.resourceFactory.afterPropertiesSet(); + this.resourceFactory.start(); this.resourceFactory.stop(); this.resourceFactory.start(); @@ -196,7 +190,7 @@ class ReactorResourceFactoryTests { // assertFalse(connectionProvider.isDisposed()); assertThat(loopResources.isDisposed()).isFalse(); - this.resourceFactory.destroy(); + this.resourceFactory.stop(); assertThat(connectionProvider.isDisposed()).isTrue(); assertThat(loopResources.isDisposed()).isTrue(); @@ -204,11 +198,10 @@ class ReactorResourceFactoryTests { @Test void restartWithExternalResources() { - this.resourceFactory.setUseGlobalResources(false); this.resourceFactory.setConnectionProvider(this.connectionProvider); this.resourceFactory.setLoopResources(this.loopResources); - this.resourceFactory.afterPropertiesSet(); + this.resourceFactory.start(); this.resourceFactory.stop(); this.resourceFactory.start(); @@ -220,10 +213,52 @@ class ReactorResourceFactoryTests { verifyNoMoreInteractions(this.connectionProvider, this.loopResources); - this.resourceFactory.destroy(); + this.resourceFactory.stop(); - // Not managed (destroy has no impact)... + // Not managed (stop has no impact)... verifyNoMoreInteractions(this.connectionProvider, this.loopResources); } + @Test + void restartWithinApplicationContext() { + GenericApplicationContext context = new GenericApplicationContext(); + context.registerBean(ReactorResourceFactory.class); + context.refresh(); + + ReactorResourceFactory resourceFactory = context.getBean(ReactorResourceFactory.class); + assertThat(resourceFactory.isRunning()).isTrue(); + + HttpResources globalResources = HttpResources.get(); + assertThat(resourceFactory.getConnectionProvider()).isSameAs(globalResources); + assertThat(resourceFactory.getLoopResources()).isSameAs(globalResources); + assertThat(globalResources.isDisposed()).isFalse(); + + context.stop(); + assertThat(globalResources.isDisposed()).isTrue(); + + context.start(); + globalResources = HttpResources.get(); + assertThat(resourceFactory.getConnectionProvider()).isSameAs(globalResources); + assertThat(resourceFactory.getLoopResources()).isSameAs(globalResources); + assertThat(globalResources.isDisposed()).isFalse(); + assertThat(globalResources.isDisposed()).isFalse(); + + context.close(); + assertThat(globalResources.isDisposed()).isTrue(); + } + + @Test + void doNotStartBeforeApplicationContextFinish() { + GenericApplicationContext context = new GenericApplicationContext() { + @Override + protected void finishRefresh() { + } + }; + context.registerBean(ReactorResourceFactory.class); + context.refresh(); + + ReactorResourceFactory resourceFactory = context.getBeanFactory().getBean(ReactorResourceFactory.class); + assertThat(resourceFactory.isRunning()).isFalse(); + } + }