Make ReactorResourceFactory lifecycle-aware

With this commit, ReactorResourceFactory now implements
Lifecycle which allows supporting JVM Checkpoint Restore
in Spring Boot with Reactor Netty server, and helps
to support Reactor Netty client as well.

Closes gh-31178
This commit is contained in:
Sébastien Deleuze 2023-09-06 12:56:01 +02:00
parent 2a916a3869
commit 125b8e7418
2 changed files with 148 additions and 45 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -26,6 +26,7 @@ import reactor.netty.resources.LoopResources;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.Lifecycle;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@ -34,14 +35,16 @@ import org.springframework.util.Assert;
* event loop threads, and {@link ConnectionProvider} for the connection pool,
* within the lifecycle of a Spring {@code ApplicationContext}.
*
* <p>This factory implements {@link InitializingBean} and {@link DisposableBean}
* and is expected typically to be declared as a Spring-managed bean.
* <p>This factory implements {@link InitializingBean}, {@link DisposableBean}
* and {@link Lifecycle} and is expected typically to be declared as a
* Spring-managed bean.
*
* @author Rossen Stoyanchev
* @author Brian Clozel
* @author Sebastien Deleuze
* @since 5.1
*/
public class ReactorResourceFactory implements InitializingBean, DisposableBean {
public class ReactorResourceFactory implements InitializingBean, DisposableBean, Lifecycle {
private boolean useGlobalResources = true;
@ -66,6 +69,10 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean
private Duration shutdownTimeout = Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_TIMEOUT);
private volatile boolean running;
private final Object lifecycleMonitor = new Object();
/**
* Whether to use global Reactor Netty resources via {@link HttpResources}.
@ -196,54 +203,84 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean
@Override
public void afterPropertiesSet() {
if (this.useGlobalResources) {
Assert.isTrue(this.loopResources == null && this.connectionProvider == null,
"'useGlobalResources' is mutually exclusive with explicitly configured resources");
HttpResources httpResources = HttpResources.get();
if (this.globalResourcesConsumer != null) {
this.globalResourcesConsumer.accept(httpResources);
start();
}
@Override
public void destroy() {
stop();
}
@Override
public void start() {
synchronized (this.lifecycleMonitor) {
if (!isRunning()) {
if (this.useGlobalResources) {
Assert.isTrue(this.loopResources == null && this.connectionProvider == null,
"'useGlobalResources' is mutually exclusive with explicitly configured resources");
HttpResources httpResources = HttpResources.get();
if (this.globalResourcesConsumer != null) {
this.globalResourcesConsumer.accept(httpResources);
}
this.connectionProvider = httpResources;
this.loopResources = httpResources;
}
else {
if (this.loopResources == null) {
this.manageLoopResources = true;
this.loopResources = this.loopResourcesSupplier.get();
}
if (this.connectionProvider == null) {
this.manageConnectionProvider = true;
this.connectionProvider = this.connectionProviderSupplier.get();
}
}
this.running = true;
}
this.connectionProvider = httpResources;
this.loopResources = httpResources;
}
else {
if (this.loopResources == null) {
this.manageLoopResources = true;
this.loopResources = this.loopResourcesSupplier.get();
}
if (this.connectionProvider == null) {
this.manageConnectionProvider = true;
this.connectionProvider = this.connectionProviderSupplier.get();
}
@Override
public void stop() {
synchronized (this.lifecycleMonitor) {
if (isRunning()) {
if (this.useGlobalResources) {
HttpResources.disposeLoopsAndConnectionsLater(this.shutdownQuietPeriod, this.shutdownTimeout).block();
this.connectionProvider = null;
this.loopResources = null;
}
else {
try {
ConnectionProvider provider = this.connectionProvider;
if (provider != null && this.manageConnectionProvider) {
this.connectionProvider = null;
provider.disposeLater().block();
}
}
catch (Throwable ex) {
// ignore
}
try {
LoopResources resources = this.loopResources;
if (resources != null && this.manageLoopResources) {
this.loopResources = null;
resources.disposeLater(this.shutdownQuietPeriod, this.shutdownTimeout).block();
}
}
catch (Throwable ex) {
// ignore
}
}
this.running = false;
}
}
}
@Override
public void destroy() {
if (this.useGlobalResources) {
HttpResources.disposeLoopsAndConnectionsLater(this.shutdownQuietPeriod, this.shutdownTimeout).block();
}
else {
try {
ConnectionProvider provider = this.connectionProvider;
if (provider != null && this.manageConnectionProvider) {
provider.disposeLater().block();
}
}
catch (Throwable ex) {
// ignore
}
try {
LoopResources resources = this.loopResources;
if (resources != null && this.manageLoopResources) {
resources.disposeLater(this.shutdownQuietPeriod, this.shutdownTimeout).block();
}
}
catch (Throwable ex) {
// ignore
}
}
public boolean isRunning() {
return this.running;
}
}

View File

@ -157,4 +157,70 @@ public class ReactorResourceFactoryTests {
verifyNoMoreInteractions(this.connectionProvider, this.loopResources);
}
@Test
void stopThenStartWithGlobalResources() {
this.resourceFactory.setUseGlobalResources(true);
this.resourceFactory.afterPropertiesSet();
this.resourceFactory.stop();
this.resourceFactory.start();
HttpResources globalResources = HttpResources.get();
assertThat(this.resourceFactory.getConnectionProvider()).isSameAs(globalResources);
assertThat(this.resourceFactory.getLoopResources()).isSameAs(globalResources);
assertThat(globalResources.isDisposed()).isFalse();
this.resourceFactory.destroy();
assertThat(globalResources.isDisposed()).isTrue();
}
@Test
void stopThenStartWithLocalResources() {
this.resourceFactory.setUseGlobalResources(false);
this.resourceFactory.afterPropertiesSet();
this.resourceFactory.stop();
this.resourceFactory.start();
ConnectionProvider connectionProvider = this.resourceFactory.getConnectionProvider();
LoopResources loopResources = this.resourceFactory.getLoopResources();
assertThat(connectionProvider).isNotSameAs(HttpResources.get());
assertThat(loopResources).isNotSameAs(HttpResources.get());
// The below does not work since ConnectionPoolProvider simply checks if pool is empty.
// assertFalse(connectionProvider.isDisposed());
assertThat(loopResources.isDisposed()).isFalse();
this.resourceFactory.destroy();
assertThat(connectionProvider.isDisposed()).isTrue();
assertThat(loopResources.isDisposed()).isTrue();
}
@Test
void stopThenStartWithExternalResources() {
this.resourceFactory.setUseGlobalResources(false);
this.resourceFactory.setConnectionProvider(this.connectionProvider);
this.resourceFactory.setLoopResources(this.loopResources);
this.resourceFactory.afterPropertiesSet();
this.resourceFactory.stop();
this.resourceFactory.start();
ConnectionProvider connectionProvider = this.resourceFactory.getConnectionProvider();
LoopResources loopResources = this.resourceFactory.getLoopResources();
assertThat(connectionProvider).isSameAs(this.connectionProvider);
assertThat(loopResources).isSameAs(this.loopResources);
verifyNoMoreInteractions(this.connectionProvider, this.loopResources);
this.resourceFactory.destroy();
// Not managed (destroy has no impact)...
verifyNoMoreInteractions(this.connectionProvider, this.loopResources);
}
}