diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorResourceFactory.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorResourceFactory.java index 46d1f57c60..39f1b582bb 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorResourceFactory.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorResourceFactory.java @@ -16,6 +16,7 @@ package org.springframework.http.client.reactive; +import java.time.Duration; import java.util.function.Consumer; import java.util.function.Supplier; @@ -61,6 +62,10 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean private boolean manageLoopResources = false; + private Duration shutdownQuietPeriod = Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_QUIET_PERIOD); + + private Duration shutdownTimeout = Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_TIMEOUT); + /** * Whether to use global Reactor Netty resources via {@link HttpResources}. @@ -83,6 +88,29 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean return this.useGlobalResources; } + /** + * Configure the amount of time we'll wait before shutting down resources. If a task is + * submitted during the {@code quietPeriod}, it is guaranteed to be accepted and the + * {@code quietPeriod} will start over. + * @since 5.2.4 + * @see #setShutdownTimeout(Duration) + */ + public void setShutdownQuietPeriod(Duration shutdownQuietPeriod) { + Assert.notNull(shutdownQuietPeriod, "shutdownQuietPeriod should not be null"); + this.shutdownQuietPeriod = shutdownQuietPeriod; + } + + /** + * Configure the maximum amount of time to wait until the disposal of the underlying + * resources regardless if a task was submitted during the {@code shutdownQuietPeriod}. + * @since 5.2.4 + * @see #setShutdownTimeout(Duration) + */ + public void setShutdownTimeout(Duration shutdownTimeout) { + Assert.notNull(shutdownTimeout, "shutdownQuietPeriod should not be null"); + this.shutdownTimeout = shutdownTimeout; + } + /** * Add a Consumer for configuring the global Reactor Netty resources on * startup. When this option is used, {@link #setUseGlobalResources} is also @@ -182,7 +210,7 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean @Override public void destroy() { if (this.useGlobalResources) { - HttpResources.disposeLoopsAndConnectionsLater().block(); + HttpResources.disposeLoopsAndConnectionsLater(this.shutdownQuietPeriod, this.shutdownTimeout).block(); } else { try { @@ -198,7 +226,7 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean try { LoopResources resources = this.loopResources; if (resources != null && this.manageLoopResources) { - resources.disposeLater().block(); + resources.disposeLater(this.shutdownQuietPeriod, this.shutdownTimeout).block(); } } catch (Throwable ex) { diff --git a/spring-web/src/test/java/org/springframework/http/client/reactive/ReactorResourceFactoryTests.java b/spring-web/src/test/java/org/springframework/http/client/reactive/ReactorResourceFactoryTests.java index 8e3284e2c7..0389a08444 100644 --- a/spring-web/src/test/java/org/springframework/http/client/reactive/ReactorResourceFactoryTests.java +++ b/spring-web/src/test/java/org/springframework/http/client/reactive/ReactorResourceFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ */ package org.springframework.http.client.reactive; +import java.time.Duration; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.Test; @@ -23,6 +24,7 @@ import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.LoopResources; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -41,7 +43,7 @@ public class ReactorResourceFactoryTests { @Test - public void globalResources() throws Exception { + void globalResources() throws Exception { this.resourceFactory.setUseGlobalResources(true); this.resourceFactory.afterPropertiesSet(); @@ -57,7 +59,7 @@ public class ReactorResourceFactoryTests { } @Test - public void globalResourcesWithConsumer() throws Exception { + void globalResourcesWithConsumer() throws Exception { AtomicBoolean invoked = new AtomicBoolean(false); @@ -69,7 +71,7 @@ public class ReactorResourceFactoryTests { } @Test - public void localResources() throws Exception { + void localResources() throws Exception { this.resourceFactory.setUseGlobalResources(false); this.resourceFactory.afterPropertiesSet(); @@ -91,7 +93,7 @@ public class ReactorResourceFactoryTests { } @Test - public void localResourcesViaSupplier() throws Exception { + void localResourcesViaSupplier() throws Exception { this.resourceFactory.setUseGlobalResources(false); this.resourceFactory.setConnectionProviderSupplier(() -> this.connectionProvider); @@ -110,12 +112,29 @@ public class ReactorResourceFactoryTests { // Managed (destroy disposes).. verify(this.connectionProvider).disposeLater(); - verify(this.loopResources).disposeLater(); + verify(this.loopResources).disposeLater(eq(Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_QUIET_PERIOD)), eq(Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_TIMEOUT))); verifyNoMoreInteractions(this.connectionProvider, this.loopResources); } @Test - public void externalResources() throws Exception { + void customShutdownDurations() throws Exception { + Duration quietPeriod = Duration.ofMillis(500); + Duration shutdownTimeout = Duration.ofSeconds(1); + this.resourceFactory.setUseGlobalResources(false); + this.resourceFactory.setConnectionProviderSupplier(() -> this.connectionProvider); + this.resourceFactory.setLoopResourcesSupplier(() -> this.loopResources); + this.resourceFactory.setShutdownQuietPeriod(quietPeriod); + this.resourceFactory.setShutdownTimeout(shutdownTimeout); + this.resourceFactory.afterPropertiesSet(); + this.resourceFactory.destroy(); + + verify(this.connectionProvider).disposeLater(); + verify(this.loopResources).disposeLater(eq(quietPeriod), eq(shutdownTimeout)); + verifyNoMoreInteractions(this.connectionProvider, this.loopResources); + } + + @Test + void externalResources() throws Exception { this.resourceFactory.setUseGlobalResources(false); this.resourceFactory.setConnectionProvider(this.connectionProvider);