Merge pull request #39958 from MazizEsa
* pr/39958: Polish "Set virtual thread names for RabbitMQ and Pulsar" Set virtual thread names for RabbitMQ and Pulsar Closes gh-39958
This commit is contained in:
commit
cd83c1af61
|
@ -76,7 +76,7 @@ class RabbitAnnotationDrivenConfiguration {
|
||||||
@ConditionalOnThreading(Threading.VIRTUAL)
|
@ConditionalOnThreading(Threading.VIRTUAL)
|
||||||
SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurerVirtualThreads() {
|
SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurerVirtualThreads() {
|
||||||
SimpleRabbitListenerContainerFactoryConfigurer configurer = simpleListenerConfigurer();
|
SimpleRabbitListenerContainerFactoryConfigurer configurer = simpleListenerConfigurer();
|
||||||
configurer.setTaskExecutor(new VirtualThreadTaskExecutor());
|
configurer.setTaskExecutor(new VirtualThreadTaskExecutor("rabbit-simple-"));
|
||||||
return configurer;
|
return configurer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,7 +105,7 @@ class RabbitAnnotationDrivenConfiguration {
|
||||||
@ConditionalOnThreading(Threading.VIRTUAL)
|
@ConditionalOnThreading(Threading.VIRTUAL)
|
||||||
DirectRabbitListenerContainerFactoryConfigurer directRabbitListenerContainerFactoryConfigurerVirtualThreads() {
|
DirectRabbitListenerContainerFactoryConfigurer directRabbitListenerContainerFactoryConfigurerVirtualThreads() {
|
||||||
DirectRabbitListenerContainerFactoryConfigurer configurer = directListenerConfigurer();
|
DirectRabbitListenerContainerFactoryConfigurer configurer = directListenerConfigurer();
|
||||||
configurer.setTaskExecutor(new VirtualThreadTaskExecutor());
|
configurer.setTaskExecutor(new VirtualThreadTaskExecutor("rabbit-direct-"));
|
||||||
return configurer;
|
return configurer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -73,9 +73,9 @@ import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
|
||||||
@Import(PulsarConfiguration.class)
|
@Import(PulsarConfiguration.class)
|
||||||
public class PulsarAutoConfiguration {
|
public class PulsarAutoConfiguration {
|
||||||
|
|
||||||
private PulsarProperties properties;
|
private final PulsarProperties properties;
|
||||||
|
|
||||||
private PulsarPropertiesMapper propertiesMapper;
|
private final PulsarPropertiesMapper propertiesMapper;
|
||||||
|
|
||||||
PulsarAutoConfiguration(PulsarProperties properties) {
|
PulsarAutoConfiguration(PulsarProperties properties) {
|
||||||
this.properties = properties;
|
this.properties = properties;
|
||||||
|
@ -158,7 +158,7 @@ public class PulsarAutoConfiguration {
|
||||||
containerProperties.setSchemaResolver(schemaResolver);
|
containerProperties.setSchemaResolver(schemaResolver);
|
||||||
containerProperties.setTopicResolver(topicResolver);
|
containerProperties.setTopicResolver(topicResolver);
|
||||||
if (Threading.VIRTUAL.isActive(environment)) {
|
if (Threading.VIRTUAL.isActive(environment)) {
|
||||||
containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor());
|
containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor("pulsar-consumer-"));
|
||||||
}
|
}
|
||||||
this.propertiesMapper.customizeContainerProperties(containerProperties);
|
this.propertiesMapper.customizeContainerProperties(containerProperties);
|
||||||
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
|
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
|
||||||
|
@ -189,7 +189,7 @@ public class PulsarAutoConfiguration {
|
||||||
PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties();
|
PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties();
|
||||||
readerContainerProperties.setSchemaResolver(schemaResolver);
|
readerContainerProperties.setSchemaResolver(schemaResolver);
|
||||||
if (Threading.VIRTUAL.isActive(environment)) {
|
if (Threading.VIRTUAL.isActive(environment)) {
|
||||||
readerContainerProperties.setReaderTaskExecutor(new VirtualThreadTaskExecutor());
|
readerContainerProperties.setReaderTaskExecutor(new VirtualThreadTaskExecutor("pulsar-reader-"));
|
||||||
}
|
}
|
||||||
this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties);
|
this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties);
|
||||||
return new DefaultPulsarReaderContainerFactory<>(pulsarReaderFactory, readerContainerProperties);
|
return new DefaultPulsarReaderContainerFactory<>(pulsarReaderFactory, readerContainerProperties);
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.springframework.boot.autoconfigure.amqp;
|
||||||
|
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import javax.net.ssl.SSLSocketFactory;
|
import javax.net.ssl.SSLSocketFactory;
|
||||||
|
@ -545,12 +546,34 @@ class RabbitAutoConfigurationTests {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@EnabledForJreRange(min = JRE.JAVA_21)
|
@EnabledForJreRange(min = JRE.JAVA_21)
|
||||||
void shouldConfigureVirtualThreads() {
|
void shouldConfigureVirtualThreadsForSimpleListener() {
|
||||||
this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> {
|
this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> {
|
||||||
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context
|
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context
|
||||||
.getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);
|
.getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);
|
||||||
assertThat(rabbitListenerContainerFactory).extracting("taskExecutor")
|
assertThat(rabbitListenerContainerFactory).extracting("taskExecutor")
|
||||||
.isInstanceOf(VirtualThreadTaskExecutor.class);
|
.isInstanceOf(VirtualThreadTaskExecutor.class);
|
||||||
|
Object taskExecutor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor");
|
||||||
|
Object virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
|
||||||
|
Thread threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
|
||||||
|
assertThat(threadCreated.getName()).containsPattern("rabbit-simple-[0-9]+");
|
||||||
|
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@EnabledForJreRange(min = JRE.JAVA_21)
|
||||||
|
void shouldConfigureVirtualThreadsForDirectListener() {
|
||||||
|
this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> {
|
||||||
|
DirectRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactory = context.getBean(
|
||||||
|
"directRabbitListenerContainerFactoryConfigurer",
|
||||||
|
DirectRabbitListenerContainerFactoryConfigurer.class);
|
||||||
|
assertThat(rabbitListenerContainerFactory).extracting("taskExecutor")
|
||||||
|
.isInstanceOf(VirtualThreadTaskExecutor.class);
|
||||||
|
Object taskExecutor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor");
|
||||||
|
Object virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
|
||||||
|
Thread threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
|
||||||
|
assertThat(threadCreated.getName()).containsPattern("rabbit-direct-[0-9]+");
|
||||||
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.springframework.boot.autoconfigure.pulsar;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||||
|
@ -69,6 +70,7 @@ import org.springframework.pulsar.core.PulsarTemplate;
|
||||||
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
|
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
|
||||||
import org.springframework.pulsar.core.SchemaResolver;
|
import org.springframework.pulsar.core.SchemaResolver;
|
||||||
import org.springframework.pulsar.core.TopicResolver;
|
import org.springframework.pulsar.core.TopicResolver;
|
||||||
|
import org.springframework.test.util.ReflectionTestUtils;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
@ -506,6 +508,10 @@ class PulsarAutoConfigurationTests {
|
||||||
.getBean(ConcurrentPulsarListenerContainerFactory.class);
|
.getBean(ConcurrentPulsarListenerContainerFactory.class);
|
||||||
assertThat(factory.getContainerProperties().getConsumerTaskExecutor())
|
assertThat(factory.getContainerProperties().getConsumerTaskExecutor())
|
||||||
.isInstanceOf(VirtualThreadTaskExecutor.class);
|
.isInstanceOf(VirtualThreadTaskExecutor.class);
|
||||||
|
Object taskExecutor = factory.getContainerProperties().getConsumerTaskExecutor();
|
||||||
|
Object virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
|
||||||
|
Thread threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
|
||||||
|
assertThat(threadCreated.getName()).containsPattern("pulsar-consumer-[0-9]+");
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -561,6 +567,10 @@ class PulsarAutoConfigurationTests {
|
||||||
.getBean(DefaultPulsarReaderContainerFactory.class);
|
.getBean(DefaultPulsarReaderContainerFactory.class);
|
||||||
assertThat(factory.getContainerProperties().getReaderTaskExecutor())
|
assertThat(factory.getContainerProperties().getReaderTaskExecutor())
|
||||||
.isInstanceOf(VirtualThreadTaskExecutor.class);
|
.isInstanceOf(VirtualThreadTaskExecutor.class);
|
||||||
|
Object taskExecutor = factory.getContainerProperties().getReaderTaskExecutor();
|
||||||
|
Object virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
|
||||||
|
Thread threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
|
||||||
|
assertThat(threadCreated.getName()).containsPattern("pulsar-reader-[0-9]+");
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue