diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java index 154ac99ceae..781029791b2 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java @@ -17,6 +17,7 @@ package org.springframework.boot.autoconfigure.amqp; import java.util.List; +import java.util.concurrent.Executor; import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder; @@ -47,6 +48,8 @@ public abstract class AbstractRabbitListenerContainerFactoryConfigurer builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless() diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java index 51decebff51..2a81759aeaf 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java @@ -30,6 +30,7 @@ import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.thread.VirtualThreads; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -51,13 +52,17 @@ class RabbitAnnotationDrivenConfiguration { private final RabbitProperties properties; + private final ObjectProvider virtualThreads; + RabbitAnnotationDrivenConfiguration(ObjectProvider messageConverter, ObjectProvider messageRecoverer, - ObjectProvider retryTemplateCustomizers, RabbitProperties properties) { + ObjectProvider retryTemplateCustomizers, RabbitProperties properties, + ObjectProvider virtualThreads) { this.messageConverter = messageConverter; this.messageRecoverer = messageRecoverer; this.retryTemplateCustomizers = retryTemplateCustomizers; this.properties = properties; + this.virtualThreads = virtualThreads; } @Bean @@ -68,6 +73,7 @@ class RabbitAnnotationDrivenConfiguration { configurer.setMessageConverter(this.messageConverter.getIfUnique()); configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique()); configurer.setRetryTemplateCustomizers(this.retryTemplateCustomizers.orderedStream().toList()); + this.virtualThreads.ifAvailable((virtualThreads) -> configurer.setTaskExecutor(virtualThreads.getExecutor())); return configurer; } @@ -92,6 +98,7 @@ class RabbitAnnotationDrivenConfiguration { configurer.setMessageConverter(this.messageConverter.getIfUnique()); configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique()); configurer.setRetryTemplateCustomizers(this.retryTemplateCustomizers.orderedStream().toList()); + this.virtualThreads.ifAvailable((virtualThreads) -> configurer.setTaskExecutor(virtualThreads.getExecutor())); return configurer; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java index 363a4153b8d..afe3271b489 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java @@ -30,6 +30,8 @@ import com.rabbitmq.client.impl.CredentialsRefreshService; import com.rabbitmq.client.impl.DefaultCredentialsProvider; import org.aopalliance.aop.Advice; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledForJreRange; +import org.junit.jupiter.api.condition.JRE; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InOrder; @@ -58,6 +60,7 @@ import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.autoconfigure.thread.VirtualThreadsAutoConfiguration; import org.springframework.boot.test.context.assertj.AssertableApplicationContext; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.boot.test.system.CapturedOutput; @@ -532,6 +535,19 @@ class RabbitAutoConfigurationTests { }); } + @Test + @EnabledForJreRange(min = JRE.JAVA_21) + void shouldConfigureVirtualThreads() { + this.contextRunner.withConfiguration(AutoConfigurations.of(VirtualThreadsAutoConfiguration.class)) + .withPropertyValues("spring.threads.virtual.enabled=true") + .run((context) -> { + SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context + .getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class); + Object executor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor"); + assertThat(executor).as("rabbitListenerContainerFactory.taskExecutor").isNotNull(); + }); + } + @Test void testSimpleRabbitListenerContainerFactoryWithDefaultForceStop() { this.contextRunner