Add RabbitMQ container forceStop property

See gh-36539
This commit is contained in:
Gary Russell 2023-07-24 16:00:29 -04:00 committed by Stephane Nicoll
parent e8dd775d1c
commit 8b716a2f6c
3 changed files with 43 additions and 2 deletions

View File

@ -118,6 +118,7 @@ public abstract class AbstractRabbitListenerContainerFactoryConfigurer<T extends
} }
factory.setMissingQueuesFatal(configuration.isMissingQueuesFatal()); factory.setMissingQueuesFatal(configuration.isMissingQueuesFatal());
factory.setDeBatchingEnabled(configuration.isDeBatchingEnabled()); factory.setDeBatchingEnabled(configuration.isDeBatchingEnabled());
factory.setForceStop(configuration.isForceStop());
ListenerRetry retryConfig = configuration.getRetry(); ListenerRetry retryConfig = configuration.getRetry();
if (retryConfig.isEnabled()) { if (retryConfig.isEnabled()) {
RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless() RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless()

View File

@ -734,6 +734,12 @@ public class RabbitProperties {
*/ */
private boolean deBatchingEnabled = true; private boolean deBatchingEnabled = true;
/**
* Whether the container (when stopped) should stop immediately after processing
* the current message or stop after processing all pre-fetched messages.
*/
private boolean forceStop;
/** /**
* Optional properties for a retry interceptor. * Optional properties for a retry interceptor.
*/ */
@ -781,6 +787,14 @@ public class RabbitProperties {
this.deBatchingEnabled = deBatchingEnabled; this.deBatchingEnabled = deBatchingEnabled;
} }
public boolean isForceStop() {
return this.forceStop;
}
public void setForceStop(boolean forceStop) {
this.forceStop = forceStop;
}
public ListenerRetry getRetry() { public ListenerRetry getRetry() {
return this.retry; return this.retry;
} }

View File

@ -519,7 +519,8 @@ class RabbitAutoConfigurationTests {
"spring.rabbitmq.listener.simple.defaultRequeueRejected:false", "spring.rabbitmq.listener.simple.defaultRequeueRejected:false",
"spring.rabbitmq.listener.simple.idleEventInterval:5", "spring.rabbitmq.listener.simple.idleEventInterval:5",
"spring.rabbitmq.listener.simple.batchSize:20", "spring.rabbitmq.listener.simple.batchSize:20",
"spring.rabbitmq.listener.simple.missingQueuesFatal:false") "spring.rabbitmq.listener.simple.missingQueuesFatal:false",
"spring.rabbitmq.listener.simple.force-stop:true")
.run((context) -> { .run((context) -> {
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context
.getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class); .getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);
@ -531,6 +532,17 @@ class RabbitAutoConfigurationTests {
}); });
} }
@Test
void testSimpleRabbitListenerContainerFactoryWithDefaultForceStop() {
this.contextRunner
.withUserConfiguration(MessageConvertersConfiguration.class, MessageRecoverersConfiguration.class)
.run((context) -> {
SimpleRabbitListenerContainerFactory containerFactory = context
.getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);
assertThat(containerFactory).hasFieldOrPropertyWithValue("forceStop", false);
});
}
@Test @Test
void testDirectRabbitListenerContainerFactoryWithCustomSettings() { void testDirectRabbitListenerContainerFactoryWithCustomSettings() {
this.contextRunner this.contextRunner
@ -547,7 +559,8 @@ class RabbitAutoConfigurationTests {
"spring.rabbitmq.listener.direct.prefetch:40", "spring.rabbitmq.listener.direct.prefetch:40",
"spring.rabbitmq.listener.direct.defaultRequeueRejected:false", "spring.rabbitmq.listener.direct.defaultRequeueRejected:false",
"spring.rabbitmq.listener.direct.idleEventInterval:5", "spring.rabbitmq.listener.direct.idleEventInterval:5",
"spring.rabbitmq.listener.direct.missingQueuesFatal:true") "spring.rabbitmq.listener.direct.missingQueuesFatal:true",
"spring.rabbitmq.listener.direct.force-stop:true")
.run((context) -> { .run((context) -> {
DirectRabbitListenerContainerFactory rabbitListenerContainerFactory = context DirectRabbitListenerContainerFactory rabbitListenerContainerFactory = context
.getBean("rabbitListenerContainerFactory", DirectRabbitListenerContainerFactory.class); .getBean("rabbitListenerContainerFactory", DirectRabbitListenerContainerFactory.class);
@ -557,6 +570,18 @@ class RabbitAutoConfigurationTests {
}); });
} }
@Test
void testDirectRabbitListenerContainerFactoryWithDefaultForceStop() {
this.contextRunner
.withUserConfiguration(MessageConvertersConfiguration.class, MessageRecoverersConfiguration.class)
.withPropertyValues("spring.rabbitmq.listener.type:direct")
.run((context) -> {
DirectRabbitListenerContainerFactory containerFactory = context
.getBean("rabbitListenerContainerFactory", DirectRabbitListenerContainerFactory.class);
assertThat(containerFactory).hasFieldOrPropertyWithValue("forceStop", false);
});
}
@Test @Test
void testSimpleRabbitListenerContainerFactoryRetryWithCustomizer() { void testSimpleRabbitListenerContainerFactoryRetryWithCustomizer() {
this.contextRunner.withUserConfiguration(RabbitRetryTemplateCustomizerConfiguration.class) this.contextRunner.withUserConfiguration(RabbitRetryTemplateCustomizerConfiguration.class)
@ -662,6 +687,7 @@ class RabbitAutoConfigurationTests {
context.getBean("myMessageConverter")); context.getBean("myMessageConverter"));
assertThat(containerFactory).hasFieldOrPropertyWithValue("defaultRequeueRejected", Boolean.FALSE); assertThat(containerFactory).hasFieldOrPropertyWithValue("defaultRequeueRejected", Boolean.FALSE);
assertThat(containerFactory).hasFieldOrPropertyWithValue("idleEventInterval", 5L); assertThat(containerFactory).hasFieldOrPropertyWithValue("idleEventInterval", 5L);
assertThat(containerFactory).hasFieldOrPropertyWithValue("forceStop", true);
Advice[] adviceChain = containerFactory.getAdviceChain(); Advice[] adviceChain = containerFactory.getAdviceChain();
assertThat(adviceChain).isNotNull(); assertThat(adviceChain).isNotNull();
assertThat(adviceChain).hasSize(1); assertThat(adviceChain).hasSize(1);