Configure quiet period for shutting down Reactor resources
This commit adds two new properties to the `ReactorResourceFactory`. This allows to configure the quiet and timeout periods when shutting down Reactor resources. While we'll retain Reactor Netty's default for production use, this option is useful for tests and developement environments when developers want to avoid long waiting times when shutting down resources. Fixes gh-24538
This commit is contained in:
parent
96de4b3cee
commit
f1680e5cee
|
@ -16,6 +16,7 @@
|
|||
|
||||
package org.springframework.http.client.reactive;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
|
@ -61,6 +62,10 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean
|
|||
|
||||
private boolean manageLoopResources = false;
|
||||
|
||||
private Duration shutdownQuietPeriod = Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_QUIET_PERIOD);
|
||||
|
||||
private Duration shutdownTimeout = Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_TIMEOUT);
|
||||
|
||||
|
||||
/**
|
||||
* Whether to use global Reactor Netty resources via {@link HttpResources}.
|
||||
|
@ -83,6 +88,29 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean
|
|||
return this.useGlobalResources;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the amount of time we'll wait before shutting down resources. If a task is
|
||||
* submitted during the {@code quietPeriod}, it is guaranteed to be accepted and the
|
||||
* {@code quietPeriod} will start over.
|
||||
* @since 5.2.4
|
||||
* @see #setShutdownTimeout(Duration)
|
||||
*/
|
||||
public void setShutdownQuietPeriod(Duration shutdownQuietPeriod) {
|
||||
Assert.notNull(shutdownQuietPeriod, "shutdownQuietPeriod should not be null");
|
||||
this.shutdownQuietPeriod = shutdownQuietPeriod;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the maximum amount of time to wait until the disposal of the underlying
|
||||
* resources regardless if a task was submitted during the {@code shutdownQuietPeriod}.
|
||||
* @since 5.2.4
|
||||
* @see #setShutdownTimeout(Duration)
|
||||
*/
|
||||
public void setShutdownTimeout(Duration shutdownTimeout) {
|
||||
Assert.notNull(shutdownTimeout, "shutdownQuietPeriod should not be null");
|
||||
this.shutdownTimeout = shutdownTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a Consumer for configuring the global Reactor Netty resources on
|
||||
* startup. When this option is used, {@link #setUseGlobalResources} is also
|
||||
|
@ -182,7 +210,7 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean
|
|||
@Override
|
||||
public void destroy() {
|
||||
if (this.useGlobalResources) {
|
||||
HttpResources.disposeLoopsAndConnectionsLater().block();
|
||||
HttpResources.disposeLoopsAndConnectionsLater(this.shutdownQuietPeriod, this.shutdownTimeout).block();
|
||||
}
|
||||
else {
|
||||
try {
|
||||
|
@ -198,7 +226,7 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean
|
|||
try {
|
||||
LoopResources resources = this.loopResources;
|
||||
if (resources != null && this.manageLoopResources) {
|
||||
resources.disposeLater().block();
|
||||
resources.disposeLater(this.shutdownQuietPeriod, this.shutdownTimeout).block();
|
||||
}
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
* Copyright 2002-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -15,6 +15,7 @@
|
|||
*/
|
||||
package org.springframework.http.client.reactive;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
@ -23,6 +24,7 @@ import reactor.netty.resources.ConnectionProvider;
|
|||
import reactor.netty.resources.LoopResources;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
|
@ -41,7 +43,7 @@ public class ReactorResourceFactoryTests {
|
|||
|
||||
|
||||
@Test
|
||||
public void globalResources() throws Exception {
|
||||
void globalResources() throws Exception {
|
||||
|
||||
this.resourceFactory.setUseGlobalResources(true);
|
||||
this.resourceFactory.afterPropertiesSet();
|
||||
|
@ -57,7 +59,7 @@ public class ReactorResourceFactoryTests {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void globalResourcesWithConsumer() throws Exception {
|
||||
void globalResourcesWithConsumer() throws Exception {
|
||||
|
||||
AtomicBoolean invoked = new AtomicBoolean(false);
|
||||
|
||||
|
@ -69,7 +71,7 @@ public class ReactorResourceFactoryTests {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void localResources() throws Exception {
|
||||
void localResources() throws Exception {
|
||||
|
||||
this.resourceFactory.setUseGlobalResources(false);
|
||||
this.resourceFactory.afterPropertiesSet();
|
||||
|
@ -91,7 +93,7 @@ public class ReactorResourceFactoryTests {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void localResourcesViaSupplier() throws Exception {
|
||||
void localResourcesViaSupplier() throws Exception {
|
||||
|
||||
this.resourceFactory.setUseGlobalResources(false);
|
||||
this.resourceFactory.setConnectionProviderSupplier(() -> this.connectionProvider);
|
||||
|
@ -110,12 +112,29 @@ public class ReactorResourceFactoryTests {
|
|||
|
||||
// Managed (destroy disposes)..
|
||||
verify(this.connectionProvider).disposeLater();
|
||||
verify(this.loopResources).disposeLater();
|
||||
verify(this.loopResources).disposeLater(eq(Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_QUIET_PERIOD)), eq(Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_TIMEOUT)));
|
||||
verifyNoMoreInteractions(this.connectionProvider, this.loopResources);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void externalResources() throws Exception {
|
||||
void customShutdownDurations() throws Exception {
|
||||
Duration quietPeriod = Duration.ofMillis(500);
|
||||
Duration shutdownTimeout = Duration.ofSeconds(1);
|
||||
this.resourceFactory.setUseGlobalResources(false);
|
||||
this.resourceFactory.setConnectionProviderSupplier(() -> this.connectionProvider);
|
||||
this.resourceFactory.setLoopResourcesSupplier(() -> this.loopResources);
|
||||
this.resourceFactory.setShutdownQuietPeriod(quietPeriod);
|
||||
this.resourceFactory.setShutdownTimeout(shutdownTimeout);
|
||||
this.resourceFactory.afterPropertiesSet();
|
||||
this.resourceFactory.destroy();
|
||||
|
||||
verify(this.connectionProvider).disposeLater();
|
||||
verify(this.loopResources).disposeLater(eq(quietPeriod), eq(shutdownTimeout));
|
||||
verifyNoMoreInteractions(this.connectionProvider, this.loopResources);
|
||||
}
|
||||
|
||||
@Test
|
||||
void externalResources() throws Exception {
|
||||
|
||||
this.resourceFactory.setUseGlobalResources(false);
|
||||
this.resourceFactory.setConnectionProvider(this.connectionProvider);
|
||||
|
|
Loading…
Reference in New Issue