parent
6eddf1b372
commit
2894e57146
|
|
@ -30,10 +30,9 @@ import org.springframework.util.Assert;
|
||||||
* Configure {@link RabbitListenerContainerFactory} with sensible defaults.
|
* Configure {@link RabbitListenerContainerFactory} with sensible defaults.
|
||||||
*
|
*
|
||||||
* @param <T> the container factory type.
|
* @param <T> the container factory type.
|
||||||
*
|
|
||||||
* @author Gary Russell
|
* @author Gary Russell
|
||||||
* @since 2.0
|
* @author Stephane Nicoll
|
||||||
*
|
* @since 2.0.0
|
||||||
*/
|
*/
|
||||||
public abstract class AbstractRabbitListenerContainerFactoryConfigurer<
|
public abstract class AbstractRabbitListenerContainerFactoryConfigurer<
|
||||||
T extends AbstractRabbitListenerContainerFactory<?>> {
|
T extends AbstractRabbitListenerContainerFactory<?>> {
|
||||||
|
|
@ -69,6 +68,10 @@ public abstract class AbstractRabbitListenerContainerFactoryConfigurer<
|
||||||
this.rabbitProperties = rabbitProperties;
|
this.rabbitProperties = rabbitProperties;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected final RabbitProperties getRabbitProperties() {
|
||||||
|
return this.rabbitProperties;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configure the specified rabbit listener container factory. The factory can be
|
* Configure the specified rabbit listener container factory. The factory can be
|
||||||
* further tuned and default settings can be overridden.
|
* further tuned and default settings can be overridden.
|
||||||
|
|
@ -76,28 +79,32 @@ public abstract class AbstractRabbitListenerContainerFactoryConfigurer<
|
||||||
* configure
|
* configure
|
||||||
* @param connectionFactory the {@link ConnectionFactory} to use
|
* @param connectionFactory the {@link ConnectionFactory} to use
|
||||||
*/
|
*/
|
||||||
public final void configure(T factory, ConnectionFactory connectionFactory) {
|
public abstract void configure(T factory, ConnectionFactory connectionFactory);
|
||||||
|
|
||||||
|
|
||||||
|
protected void configure(T factory, ConnectionFactory connectionFactory,
|
||||||
|
RabbitProperties.AmqpContainer configuration) {
|
||||||
Assert.notNull(factory, "Factory must not be null");
|
Assert.notNull(factory, "Factory must not be null");
|
||||||
Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
|
Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
|
||||||
|
Assert.notNull(configuration, "Configuration must not be null");
|
||||||
factory.setConnectionFactory(connectionFactory);
|
factory.setConnectionFactory(connectionFactory);
|
||||||
if (this.messageConverter != null) {
|
if (this.messageConverter != null) {
|
||||||
factory.setMessageConverter(this.messageConverter);
|
factory.setMessageConverter(this.messageConverter);
|
||||||
}
|
}
|
||||||
RabbitProperties.Listener listenerConfig = this.rabbitProperties.getListener();
|
factory.setAutoStartup(configuration.isAutoStartup());
|
||||||
factory.setAutoStartup(listenerConfig.isAutoStartup());
|
if (configuration.getAcknowledgeMode() != null) {
|
||||||
if (listenerConfig.getAcknowledgeMode() != null) {
|
factory.setAcknowledgeMode(configuration.getAcknowledgeMode());
|
||||||
factory.setAcknowledgeMode(listenerConfig.getAcknowledgeMode());
|
|
||||||
}
|
}
|
||||||
if (listenerConfig.getPrefetch() != null) {
|
if (configuration.getPrefetch() != null) {
|
||||||
factory.setPrefetchCount(listenerConfig.getPrefetch());
|
factory.setPrefetchCount(configuration.getPrefetch());
|
||||||
}
|
}
|
||||||
if (listenerConfig.getDefaultRequeueRejected() != null) {
|
if (configuration.getDefaultRequeueRejected() != null) {
|
||||||
factory.setDefaultRequeueRejected(listenerConfig.getDefaultRequeueRejected());
|
factory.setDefaultRequeueRejected(configuration.getDefaultRequeueRejected());
|
||||||
}
|
}
|
||||||
if (listenerConfig.getIdleEventInterval() != null) {
|
if (configuration.getIdleEventInterval() != null) {
|
||||||
factory.setIdleEventInterval(listenerConfig.getIdleEventInterval());
|
factory.setIdleEventInterval(configuration.getIdleEventInterval());
|
||||||
}
|
}
|
||||||
ListenerRetry retryConfig = listenerConfig.getRetry();
|
ListenerRetry retryConfig = configuration.getRetry();
|
||||||
if (retryConfig.isEnabled()) {
|
if (retryConfig.isEnabled()) {
|
||||||
RetryInterceptorBuilder<?> builder = (retryConfig.isStateless()
|
RetryInterceptorBuilder<?> builder = (retryConfig.isStateless()
|
||||||
? RetryInterceptorBuilder.stateless()
|
? RetryInterceptorBuilder.stateless()
|
||||||
|
|
@ -110,15 +117,6 @@ public abstract class AbstractRabbitListenerContainerFactoryConfigurer<
|
||||||
builder.recoverer(recoverer);
|
builder.recoverer(recoverer);
|
||||||
factory.setAdviceChain(builder.build());
|
factory.setAdviceChain(builder.build());
|
||||||
}
|
}
|
||||||
configure(factory, this.rabbitProperties);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Perform factory-specific configuration.
|
|
||||||
*
|
|
||||||
* @param factory the factory.
|
|
||||||
* @param rabbitProperties the properties.
|
|
||||||
*/
|
|
||||||
protected abstract void configure(T factory, RabbitProperties rabbitProperties);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,22 +17,25 @@
|
||||||
package org.springframework.boot.autoconfigure.amqp;
|
package org.springframework.boot.autoconfigure.amqp;
|
||||||
|
|
||||||
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
|
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
|
||||||
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
|
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configure {@link RabbitListenerContainerFactory} with sensible defaults.
|
* Configure {@link DirectRabbitListenerContainerFactoryConfigurer} with sensible defaults.
|
||||||
*
|
*
|
||||||
* @author Gary Russell
|
* @author Gary Russell
|
||||||
|
* @author Stephane Nicoll
|
||||||
* @since 2.0
|
* @since 2.0
|
||||||
*/
|
*/
|
||||||
public final class DirectRabbitListenerContainerFactoryConfigurer
|
public final class DirectRabbitListenerContainerFactoryConfigurer
|
||||||
extends AbstractRabbitListenerContainerFactoryConfigurer<DirectRabbitListenerContainerFactory> {
|
extends AbstractRabbitListenerContainerFactoryConfigurer<DirectRabbitListenerContainerFactory> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void configure(DirectRabbitListenerContainerFactory factory, RabbitProperties rabbitProperties) {
|
public void configure(DirectRabbitListenerContainerFactory factory, ConnectionFactory connectionFactory) {
|
||||||
RabbitProperties.Listener listenerConfig = rabbitProperties.getListener();
|
RabbitProperties.DirectContainer config = getRabbitProperties().getListener()
|
||||||
if (listenerConfig.getConsumersPerQueue() != null) {
|
.getDirect();
|
||||||
factory.setConsumersPerQueue(listenerConfig.getConsumersPerQueue());
|
configure(factory, connectionFactory, config);
|
||||||
|
if (config.getConsumersPerQueue() != null) {
|
||||||
|
factory.setConsumersPerQueue(config.getConsumersPerQueue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -41,11 +41,11 @@ import org.springframework.context.annotation.Configuration;
|
||||||
@ConditionalOnClass(EnableRabbit.class)
|
@ConditionalOnClass(EnableRabbit.class)
|
||||||
class RabbitAnnotationDrivenConfiguration {
|
class RabbitAnnotationDrivenConfiguration {
|
||||||
|
|
||||||
protected final ObjectProvider<MessageConverter> messageConverter;
|
private final ObjectProvider<MessageConverter> messageConverter;
|
||||||
|
|
||||||
protected final ObjectProvider<MessageRecoverer> messageRecoverer;
|
private final ObjectProvider<MessageRecoverer> messageRecoverer;
|
||||||
|
|
||||||
protected final RabbitProperties properties;
|
private final RabbitProperties properties;
|
||||||
|
|
||||||
RabbitAnnotationDrivenConfiguration(ObjectProvider<MessageConverter> messageConverter,
|
RabbitAnnotationDrivenConfiguration(ObjectProvider<MessageConverter> messageConverter,
|
||||||
ObjectProvider<MessageRecoverer> messageRecoverer,
|
ObjectProvider<MessageRecoverer> messageRecoverer,
|
||||||
|
|
@ -55,51 +55,31 @@ class RabbitAnnotationDrivenConfiguration {
|
||||||
this.properties = properties;
|
this.properties = properties;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Configuration
|
|
||||||
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "listener.type", havingValue = "simple",
|
|
||||||
matchIfMissing = true)
|
|
||||||
public static class SimpleContainerConfiguration extends RabbitAnnotationDrivenConfiguration {
|
|
||||||
|
|
||||||
SimpleContainerConfiguration(ObjectProvider<MessageConverter> messageConverter,
|
|
||||||
ObjectProvider<MessageRecoverer> messageRecoverer, RabbitProperties properties) {
|
|
||||||
super(messageConverter, messageRecoverer, properties);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
@ConditionalOnMissingBean
|
@ConditionalOnMissingBean
|
||||||
public SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer() {
|
public SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurer() {
|
||||||
SimpleRabbitListenerContainerFactoryConfigurer configurer =
|
SimpleRabbitListenerContainerFactoryConfigurer configurer = new SimpleRabbitListenerContainerFactoryConfigurer();
|
||||||
new SimpleRabbitListenerContainerFactoryConfigurer();
|
|
||||||
configurer.setMessageConverter(this.messageConverter.getIfUnique());
|
configurer.setMessageConverter(this.messageConverter.getIfUnique());
|
||||||
configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique());
|
configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique());
|
||||||
configurer.setRabbitProperties(this.properties);
|
configurer.setRabbitProperties(this.properties);
|
||||||
return configurer;
|
return configurer;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean(name = "rabbitListenerContainerFactory")
|
||||||
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
|
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
|
||||||
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
|
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", matchIfMissing = true)
|
||||||
|
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
|
||||||
SimpleRabbitListenerContainerFactoryConfigurer configurer,
|
SimpleRabbitListenerContainerFactoryConfigurer configurer,
|
||||||
ConnectionFactory connectionFactory) {
|
ConnectionFactory connectionFactory) {
|
||||||
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
||||||
configurer.configure(factory, connectionFactory);
|
configurer.configure(factory, connectionFactory);
|
||||||
return factory;
|
return factory;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Configuration
|
|
||||||
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "listener.type", havingValue = "direct")
|
|
||||||
public static class DirectContainerConfiguration extends RabbitAnnotationDrivenConfiguration {
|
|
||||||
|
|
||||||
DirectContainerConfiguration(ObjectProvider<MessageConverter> messageConverter,
|
|
||||||
ObjectProvider<MessageRecoverer> messageRecoverer, RabbitProperties properties) {
|
|
||||||
super(messageConverter, messageRecoverer, properties);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
@ConditionalOnMissingBean
|
@ConditionalOnMissingBean
|
||||||
public DirectRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer() {
|
public DirectRabbitListenerContainerFactoryConfigurer directRabbitListenerContainerFactoryConfigurer() {
|
||||||
DirectRabbitListenerContainerFactoryConfigurer configurer =
|
DirectRabbitListenerContainerFactoryConfigurer configurer =
|
||||||
new DirectRabbitListenerContainerFactoryConfigurer();
|
new DirectRabbitListenerContainerFactoryConfigurer();
|
||||||
configurer.setMessageConverter(this.messageConverter.getIfUnique());
|
configurer.setMessageConverter(this.messageConverter.getIfUnique());
|
||||||
|
|
@ -108,16 +88,16 @@ class RabbitAnnotationDrivenConfiguration {
|
||||||
return configurer;
|
return configurer;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean(name = "rabbitListenerContainerFactory")
|
||||||
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
|
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
|
||||||
public DirectRabbitListenerContainerFactory rabbitListenerContainerFactory(
|
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "direct")
|
||||||
|
public DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory(
|
||||||
DirectRabbitListenerContainerFactoryConfigurer configurer,
|
DirectRabbitListenerContainerFactoryConfigurer configurer,
|
||||||
ConnectionFactory connectionFactory) {
|
ConnectionFactory connectionFactory) {
|
||||||
DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
|
DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
|
||||||
configurer.configure(factory, connectionFactory);
|
configurer.configure(factory, connectionFactory);
|
||||||
return factory;
|
return factory;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@EnableRabbit
|
@EnableRabbit
|
||||||
@ConditionalOnMissingBean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
|
@ConditionalOnMissingBean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
|
||||||
|
|
|
||||||
|
|
@ -40,22 +40,6 @@ import org.springframework.util.StringUtils;
|
||||||
@ConfigurationProperties(prefix = "spring.rabbitmq")
|
@ConfigurationProperties(prefix = "spring.rabbitmq")
|
||||||
public class RabbitProperties {
|
public class RabbitProperties {
|
||||||
|
|
||||||
public enum ContainerType {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* SimpleMessageListenerContainer - legacy container where the RabbitMQ consumer
|
|
||||||
* dispatches messages to an invoker thread.
|
|
||||||
*/
|
|
||||||
SIMPLE,
|
|
||||||
|
|
||||||
/**
|
|
||||||
* DirectMessageListenerContainer - container where the listener is invoked
|
|
||||||
* directly on the RabbitMQ consumer thread.
|
|
||||||
*/
|
|
||||||
DIRECT
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RabbitMQ host.
|
* RabbitMQ host.
|
||||||
*/
|
*/
|
||||||
|
|
@ -481,10 +465,42 @@ public class RabbitProperties {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public enum ContainerType {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Legacy container where the RabbitMQ consumer dispatches messages to an
|
||||||
|
* invoker thread.
|
||||||
|
*/
|
||||||
|
SIMPLE,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Container where the listener is invoked directly on the RabbitMQ consumer
|
||||||
|
* thread.
|
||||||
|
*/
|
||||||
|
DIRECT
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public static class Listener {
|
public static class Listener {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Listener container type.
|
||||||
|
*/
|
||||||
|
private ContainerType type = ContainerType.SIMPLE;
|
||||||
|
|
||||||
@NestedConfigurationProperty
|
@NestedConfigurationProperty
|
||||||
private final AmqpContainer simple = new AmqpContainer();
|
private final SimpleContainer simple = new SimpleContainer();
|
||||||
|
|
||||||
|
@NestedConfigurationProperty
|
||||||
|
private final DirectContainer direct = new DirectContainer();
|
||||||
|
|
||||||
|
public ContainerType getType() {
|
||||||
|
return this.type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setType(ContainerType containerType) {
|
||||||
|
this.type = containerType;
|
||||||
|
}
|
||||||
|
|
||||||
@DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.auto-startup")
|
@DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.auto-startup")
|
||||||
@Deprecated
|
@Deprecated
|
||||||
|
|
@ -580,18 +596,17 @@ public class RabbitProperties {
|
||||||
return getSimple().getRetry();
|
return getSimple().getRetry();
|
||||||
}
|
}
|
||||||
|
|
||||||
public AmqpContainer getSimple() {
|
public SimpleContainer getSimple() {
|
||||||
return this.simple;
|
return this.simple;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public DirectContainer getDirect() {
|
||||||
|
return this.direct;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class AmqpContainer {
|
}
|
||||||
|
|
||||||
/**
|
public static abstract class AmqpContainer {
|
||||||
* Container type.
|
|
||||||
*/
|
|
||||||
private ContainerType type = ContainerType.SIMPLE;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start the container automatically on startup.
|
* Start the container automatically on startup.
|
||||||
|
|
@ -603,34 +618,12 @@ public class RabbitProperties {
|
||||||
*/
|
*/
|
||||||
private AcknowledgeMode acknowledgeMode;
|
private AcknowledgeMode acknowledgeMode;
|
||||||
|
|
||||||
/**
|
|
||||||
* Minimum number of listener invoker threads - applies only to simple containers.
|
|
||||||
*/
|
|
||||||
private Integer concurrency;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Maximum number of listener invoker threads - applies only to simple containers.
|
|
||||||
*/
|
|
||||||
private Integer maxConcurrency;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Number of RabbitMQ consumers per queue - applies only to direct containers.
|
|
||||||
*/
|
|
||||||
private Integer consumersPerQueue;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Number of messages to be handled in a single request. It should be greater than
|
* Number of messages to be handled in a single request. It should be greater than
|
||||||
* or equal to the transaction size (if used).
|
* or equal to the transaction size (if used).
|
||||||
*/
|
*/
|
||||||
private Integer prefetch;
|
private Integer prefetch;
|
||||||
|
|
||||||
/**
|
|
||||||
* Number of messages to be processed in a transaction; number of messages between
|
|
||||||
* acks. For best results it should be less than or equal to the prefetch count -
|
|
||||||
* applies only to simple containers.
|
|
||||||
*/
|
|
||||||
private Integer transactionSize;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Whether rejected deliveries are requeued by default; default true.
|
* Whether rejected deliveries are requeued by default; default true.
|
||||||
*/
|
*/
|
||||||
|
|
@ -647,14 +640,6 @@ public class RabbitProperties {
|
||||||
@NestedConfigurationProperty
|
@NestedConfigurationProperty
|
||||||
private final ListenerRetry retry = new ListenerRetry();
|
private final ListenerRetry retry = new ListenerRetry();
|
||||||
|
|
||||||
public ContainerType getType() {
|
|
||||||
return this.type;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setType(ContainerType containerType) {
|
|
||||||
this.type = containerType;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isAutoStartup() {
|
public boolean isAutoStartup() {
|
||||||
return this.autoStartup;
|
return this.autoStartup;
|
||||||
}
|
}
|
||||||
|
|
@ -671,30 +656,6 @@ public class RabbitProperties {
|
||||||
this.acknowledgeMode = acknowledgeMode;
|
this.acknowledgeMode = acknowledgeMode;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getConcurrency() {
|
|
||||||
return this.concurrency;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setConcurrency(Integer concurrency) {
|
|
||||||
this.concurrency = concurrency;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Integer getMaxConcurrency() {
|
|
||||||
return this.maxConcurrency;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setMaxConcurrency(Integer maxConcurrency) {
|
|
||||||
this.maxConcurrency = maxConcurrency;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Integer getConsumersPerQueue() {
|
|
||||||
return this.consumersPerQueue;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setConsumersPerQueue(Integer consumersPerQueue) {
|
|
||||||
this.consumersPerQueue = consumersPerQueue;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Integer getPrefetch() {
|
public Integer getPrefetch() {
|
||||||
return this.prefetch;
|
return this.prefetch;
|
||||||
}
|
}
|
||||||
|
|
@ -703,14 +664,6 @@ public class RabbitProperties {
|
||||||
this.prefetch = prefetch;
|
this.prefetch = prefetch;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getTransactionSize() {
|
|
||||||
return this.transactionSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setTransactionSize(Integer transactionSize) {
|
|
||||||
this.transactionSize = transactionSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Boolean getDefaultRequeueRejected() {
|
public Boolean getDefaultRequeueRejected() {
|
||||||
return this.defaultRequeueRejected;
|
return this.defaultRequeueRejected;
|
||||||
}
|
}
|
||||||
|
|
@ -733,6 +686,74 @@ public class RabbitProperties {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration properties for {@code SimpleMessageListenerContainer}.
|
||||||
|
*/
|
||||||
|
public static class SimpleContainer extends AmqpContainer {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Minimum number of listener invoker threads.
|
||||||
|
*/
|
||||||
|
private Integer concurrency;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maximum number of listener invoker threads.
|
||||||
|
*/
|
||||||
|
private Integer maxConcurrency;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of messages to be processed in a transaction; number of messages
|
||||||
|
* between acks. For best results it should
|
||||||
|
* be less than or equal to the prefetch count.
|
||||||
|
*/
|
||||||
|
private Integer transactionSize;
|
||||||
|
|
||||||
|
public Integer getConcurrency() {
|
||||||
|
return this.concurrency;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setConcurrency(Integer concurrency) {
|
||||||
|
this.concurrency = concurrency;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getMaxConcurrency() {
|
||||||
|
return this.maxConcurrency;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxConcurrency(Integer maxConcurrency) {
|
||||||
|
this.maxConcurrency = maxConcurrency;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getTransactionSize() {
|
||||||
|
return this.transactionSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTransactionSize(Integer transactionSize) {
|
||||||
|
this.transactionSize = transactionSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration properties for {@code DirectMessageListenerContainer}.
|
||||||
|
*/
|
||||||
|
public static class DirectContainer extends AmqpContainer {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of consumers per queue.
|
||||||
|
*/
|
||||||
|
private Integer consumersPerQueue;
|
||||||
|
|
||||||
|
public Integer getConsumersPerQueue() {
|
||||||
|
return this.consumersPerQueue;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setConsumersPerQueue(Integer consumersPerQueue) {
|
||||||
|
this.consumersPerQueue = consumersPerQueue;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public static class Template {
|
public static class Template {
|
||||||
|
|
||||||
@NestedConfigurationProperty
|
@NestedConfigurationProperty
|
||||||
|
|
|
||||||
|
|
@ -17,10 +17,10 @@
|
||||||
package org.springframework.boot.autoconfigure.amqp;
|
package org.springframework.boot.autoconfigure.amqp;
|
||||||
|
|
||||||
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||||
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
|
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configure {@link RabbitListenerContainerFactory} with sensible defaults.
|
* Configure {@link SimpleRabbitListenerContainerFactoryConfigurer} with sensible defaults.
|
||||||
*
|
*
|
||||||
* @author Stephane Nicoll
|
* @author Stephane Nicoll
|
||||||
* @author Gary Russell
|
* @author Gary Russell
|
||||||
|
|
@ -30,16 +30,18 @@ public final class SimpleRabbitListenerContainerFactoryConfigurer
|
||||||
extends AbstractRabbitListenerContainerFactoryConfigurer<SimpleRabbitListenerContainerFactory> {
|
extends AbstractRabbitListenerContainerFactoryConfigurer<SimpleRabbitListenerContainerFactory> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void configure(SimpleRabbitListenerContainerFactory factory, RabbitProperties rabbitProperties) {
|
public void configure(SimpleRabbitListenerContainerFactory factory, ConnectionFactory connectionFactory) {
|
||||||
RabbitProperties.Listener listenerConfig = rabbitProperties.getListener();
|
RabbitProperties.SimpleContainer config = getRabbitProperties().getListener()
|
||||||
if (listenerConfig.getConcurrency() != null) {
|
.getSimple();
|
||||||
factory.setConcurrentConsumers(listenerConfig.getConcurrency());
|
configure(factory, connectionFactory, config);
|
||||||
|
if (config.getConcurrency() != null) {
|
||||||
|
factory.setConcurrentConsumers(config.getConcurrency());
|
||||||
}
|
}
|
||||||
if (listenerConfig.getMaxConcurrency() != null) {
|
if (config.getMaxConcurrency() != null) {
|
||||||
factory.setMaxConcurrentConsumers(listenerConfig.getMaxConcurrency());
|
factory.setMaxConcurrentConsumers(config.getMaxConcurrency());
|
||||||
}
|
}
|
||||||
if (listenerConfig.getTransactionSize() != null) {
|
if (config.getTransactionSize() != null) {
|
||||||
factory.setTxSize(listenerConfig.getTransactionSize());
|
factory.setTxSize(config.getTransactionSize());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -348,17 +348,17 @@ public class RabbitAutoConfigurationTests {
|
||||||
load(new Class<?>[] { MessageConvertersConfiguration.class,
|
load(new Class<?>[] { MessageConvertersConfiguration.class,
|
||||||
MessageRecoverersConfiguration.class },
|
MessageRecoverersConfiguration.class },
|
||||||
"spring.rabbitmq.listener.type:direct",
|
"spring.rabbitmq.listener.type:direct",
|
||||||
"spring.rabbitmq.listener.retry.enabled:true",
|
"spring.rabbitmq.listener.direct.retry.enabled:true",
|
||||||
"spring.rabbitmq.listener.retry.maxAttempts:4",
|
"spring.rabbitmq.listener.direct.retry.maxAttempts:4",
|
||||||
"spring.rabbitmq.listener.retry.initialInterval:2000",
|
"spring.rabbitmq.listener.direct.retry.initialInterval:2000",
|
||||||
"spring.rabbitmq.listener.retry.multiplier:1.5",
|
"spring.rabbitmq.listener.direct.retry.multiplier:1.5",
|
||||||
"spring.rabbitmq.listener.retry.maxInterval:5000",
|
"spring.rabbitmq.listener.direct.retry.maxInterval:5000",
|
||||||
"spring.rabbitmq.listener.autoStartup:false",
|
"spring.rabbitmq.listener.direct.autoStartup:false",
|
||||||
"spring.rabbitmq.listener.acknowledgeMode:manual",
|
"spring.rabbitmq.listener.direct.acknowledgeMode:manual",
|
||||||
"spring.rabbitmq.listener.consumers-per-queue:5",
|
"spring.rabbitmq.listener.direct.consumers-per-queue:5",
|
||||||
"spring.rabbitmq.listener.prefetch:40",
|
"spring.rabbitmq.listener.direct.prefetch:40",
|
||||||
"spring.rabbitmq.listener.defaultRequeueRejected:false",
|
"spring.rabbitmq.listener.direct.defaultRequeueRejected:false",
|
||||||
"spring.rabbitmq.listener.idleEventInterval:5");
|
"spring.rabbitmq.listener.direct.idleEventInterval:5");
|
||||||
DirectRabbitListenerContainerFactory rabbitListenerContainerFactory = this.context
|
DirectRabbitListenerContainerFactory rabbitListenerContainerFactory = this.context
|
||||||
.getBean("rabbitListenerContainerFactory",
|
.getBean("rabbitListenerContainerFactory",
|
||||||
DirectRabbitListenerContainerFactory.class);
|
DirectRabbitListenerContainerFactory.class);
|
||||||
|
|
@ -367,6 +367,52 @@ public class RabbitAutoConfigurationTests {
|
||||||
checkCommonProps(dfa);
|
checkCommonProps(dfa);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRabbitListenerContainerFactoryConfigurersAreAvailable() {
|
||||||
|
load(TestConfiguration.class,
|
||||||
|
"spring.rabbitmq.listener.simple.concurrency:5",
|
||||||
|
"spring.rabbitmq.listener.simple.maxConcurrency:10",
|
||||||
|
"spring.rabbitmq.listener.simple.prefetch:40",
|
||||||
|
"spring.rabbitmq.listener.direct.consumers-per-queue:5",
|
||||||
|
"spring.rabbitmq.listener.direct.prefetch:40");
|
||||||
|
assertThat(this.context.getBeansOfType(
|
||||||
|
SimpleRabbitListenerContainerFactoryConfigurer.class)).hasSize(1);
|
||||||
|
assertThat(this.context.getBeansOfType(
|
||||||
|
DirectRabbitListenerContainerFactoryConfigurer.class)).hasSize(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleRabbitListenerContainerFactoryConfigurerUsesConfig() {
|
||||||
|
load(TestConfiguration.class,
|
||||||
|
"spring.rabbitmq.listener.type:direct", // listener type is irrelevant
|
||||||
|
"spring.rabbitmq.listener.simple.concurrency:5",
|
||||||
|
"spring.rabbitmq.listener.simple.maxConcurrency:10",
|
||||||
|
"spring.rabbitmq.listener.simple.prefetch:40");
|
||||||
|
SimpleRabbitListenerContainerFactoryConfigurer configurer = this.context
|
||||||
|
.getBean(SimpleRabbitListenerContainerFactoryConfigurer.class);
|
||||||
|
SimpleRabbitListenerContainerFactory factory =
|
||||||
|
mock(SimpleRabbitListenerContainerFactory.class);
|
||||||
|
configurer.configure(factory, mock(ConnectionFactory.class));
|
||||||
|
verify(factory).setConcurrentConsumers(5);
|
||||||
|
verify(factory).setMaxConcurrentConsumers(10);
|
||||||
|
verify(factory).setPrefetchCount(40);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDirectRabbitListenerContainerFactoryConfigurerUsesConfig() {
|
||||||
|
load(TestConfiguration.class,
|
||||||
|
"spring.rabbitmq.listener.type:simple", // listener type is irrelevant
|
||||||
|
"spring.rabbitmq.listener.direct.consumers-per-queue:5",
|
||||||
|
"spring.rabbitmq.listener.direct.prefetch:40");
|
||||||
|
DirectRabbitListenerContainerFactoryConfigurer configurer = this.context
|
||||||
|
.getBean(DirectRabbitListenerContainerFactoryConfigurer.class);
|
||||||
|
DirectRabbitListenerContainerFactory factory =
|
||||||
|
mock(DirectRabbitListenerContainerFactory.class);
|
||||||
|
configurer.configure(factory, mock(ConnectionFactory.class));
|
||||||
|
verify(factory).setConsumersPerQueue(5);
|
||||||
|
verify(factory).setPrefetchCount(40);
|
||||||
|
}
|
||||||
|
|
||||||
private void checkCommonProps(DirectFieldAccessor dfa) {
|
private void checkCommonProps(DirectFieldAccessor dfa) {
|
||||||
assertThat(dfa.getPropertyValue("autoStartup")).isEqualTo(Boolean.FALSE);
|
assertThat(dfa.getPropertyValue("autoStartup")).isEqualTo(Boolean.FALSE);
|
||||||
assertThat(dfa.getPropertyValue("acknowledgeMode"))
|
assertThat(dfa.getPropertyValue("acknowledgeMode"))
|
||||||
|
|
|
||||||
|
|
@ -975,13 +975,13 @@ content into your application; rather pick only the properties that you need.
|
||||||
spring.rabbitmq.connection-timeout= # Connection timeout, in milliseconds; zero for infinite.
|
spring.rabbitmq.connection-timeout= # Connection timeout, in milliseconds; zero for infinite.
|
||||||
spring.rabbitmq.dynamic=true # Create an AmqpAdmin bean.
|
spring.rabbitmq.dynamic=true # Create an AmqpAdmin bean.
|
||||||
spring.rabbitmq.host=localhost # RabbitMQ host.
|
spring.rabbitmq.host=localhost # RabbitMQ host.
|
||||||
|
spring.rabbitmq.listener.direct.consumers-per-queue= # Number of consumers per queue.
|
||||||
spring.rabbitmq.listener.simple.acknowledge-mode= # Acknowledge mode of container.
|
spring.rabbitmq.listener.simple.acknowledge-mode= # Acknowledge mode of container.
|
||||||
spring.rabbitmq.listener.simple.auto-startup=true # Start the container automatically on startup.
|
spring.rabbitmq.listener.simple.auto-startup=true # Start the container automatically on startup.
|
||||||
spring.rabbitmq.listener.simple.concurrency= # Minimum number of listener invoker threads for a `simple` container.
|
spring.rabbitmq.listener.simple.concurrency= # Minimum number of listener invoker threads.
|
||||||
spring.rabbitmq.listener.consumers-per-queue= # The number of Consumers per queue for a `direct` container.
|
spring.rabbitmq.listener.simple.default-requeue-rejected= # Whether or not to requeue delivery failures.
|
||||||
spring.rabbitmq.listener.simple.default-requeue-rejected= # Whether or not to requeue delivery failures; default `true`.
|
|
||||||
spring.rabbitmq.listener.simple.idle-event-interval= # How often idle container events should be published in milliseconds.
|
spring.rabbitmq.listener.simple.idle-event-interval= # How often idle container events should be published in milliseconds.
|
||||||
spring.rabbitmq.listener.simple.max-concurrency= # Maximum number of listener invoker for a `simple` container.
|
spring.rabbitmq.listener.simple.max-concurrency= # Maximum number of listener invoker.
|
||||||
spring.rabbitmq.listener.simple.prefetch= # Number of messages to be handled in a single request. It should be greater than or equal to the transaction size (if used).
|
spring.rabbitmq.listener.simple.prefetch= # Number of messages to be handled in a single request. It should be greater than or equal to the transaction size (if used).
|
||||||
spring.rabbitmq.listener.simple.retry.enabled=false # Whether or not publishing retries are enabled.
|
spring.rabbitmq.listener.simple.retry.enabled=false # Whether or not publishing retries are enabled.
|
||||||
spring.rabbitmq.listener.simple.retry.initial-interval=1000 # Interval between the first and second attempt to deliver a message.
|
spring.rabbitmq.listener.simple.retry.initial-interval=1000 # Interval between the first and second attempt to deliver a message.
|
||||||
|
|
@ -989,8 +989,8 @@ content into your application; rather pick only the properties that you need.
|
||||||
spring.rabbitmq.listener.simple.retry.max-interval=10000 # Maximum interval between attempts.
|
spring.rabbitmq.listener.simple.retry.max-interval=10000 # Maximum interval between attempts.
|
||||||
spring.rabbitmq.listener.simple.retry.multiplier=1.0 # A multiplier to apply to the previous delivery retry interval.
|
spring.rabbitmq.listener.simple.retry.multiplier=1.0 # A multiplier to apply to the previous delivery retry interval.
|
||||||
spring.rabbitmq.listener.simple.retry.stateless=true # Whether or not retry is stateless or stateful.
|
spring.rabbitmq.listener.simple.retry.stateless=true # Whether or not retry is stateless or stateful.
|
||||||
spring.rabbitmq.listener.simple.transaction-size= Number of messages to be processed in a transaction; number of messages between acks. For best results it should be less than or equal to the prefetch count - applies only to `simple` containers.
|
spring.rabbitmq.listener.simple.transaction-size= # Number of messages to be processed in a transaction; number of messages between acks. For best results it should be less than or equal to the prefetch count.
|
||||||
spring.rabbitmq.listener.type= # The listener container type `simple` or `direct`; default `simple`.
|
spring.rabbitmq.listener.type=simple # Listener container type.
|
||||||
spring.rabbitmq.password= # Login to authenticate against the broker.
|
spring.rabbitmq.password= # Login to authenticate against the broker.
|
||||||
spring.rabbitmq.port=5672 # RabbitMQ port.
|
spring.rabbitmq.port=5672 # RabbitMQ port.
|
||||||
spring.rabbitmq.publisher-confirms=false # Enable publisher confirms.
|
spring.rabbitmq.publisher-confirms=false # Enable publisher confirms.
|
||||||
|
|
|
||||||
|
|
@ -4653,9 +4653,10 @@ the broker connection is lost. Retries are disabled by default.
|
||||||
==== Receiving a message
|
==== Receiving a message
|
||||||
When the Rabbit infrastructure is present, any bean can be annotated with
|
When the Rabbit infrastructure is present, any bean can be annotated with
|
||||||
`@RabbitListener` to create a listener endpoint. If no `RabbitListenerContainerFactory`
|
`@RabbitListener` to create a listener endpoint. If no `RabbitListenerContainerFactory`
|
||||||
has been defined, a default one is configured automatically. If a `MessageConverter` or
|
has been defined, a default `SimpleRabbitListenerContainerFactory` is configured
|
||||||
`MessageRecoverer` beans are defined, they are associated automatically to the default
|
automatically and you can switch to a direct container using the
|
||||||
factory.
|
`spring.rabbitmq.listener.type` property. If a `MessageConverter` or `MessageRecoverer`
|
||||||
|
beans are defined, they are associated automatically to the default factory.
|
||||||
|
|
||||||
The following component creates a listener endpoint on the `someQueue` queue:
|
The following component creates a listener endpoint on the `someQueue` queue:
|
||||||
|
|
||||||
|
|
@ -4677,9 +4678,13 @@ for more details.
|
||||||
|
|
||||||
If you need to create more `RabbitListenerContainerFactory` instances or if you want to
|
If you need to create more `RabbitListenerContainerFactory` instances or if you want to
|
||||||
override the default, Spring Boot provides a
|
override the default, Spring Boot provides a
|
||||||
`SimpleRabbitListenerContainerFactoryConfigurer` that you can use to initialize a
|
`SimpleRabbitListenerContainerFactoryConfigurer` and
|
||||||
`SimpleRabbitListenerContainerFactory` with the same settings as the one that is
|
`DirectRabbitListenerContainerFactoryConfigurer` that you can use to initialize a
|
||||||
auto-configured.
|
`SimpleRabbitListenerContainerFactory` and `DirectRabbitListenerContainerFactory` with the
|
||||||
|
same settings as the one used by the auto-configuration.
|
||||||
|
|
||||||
|
TIP: It doesn't matter which container type you've chosen, those two beans are exposed by
|
||||||
|
the auto-configuration.
|
||||||
|
|
||||||
For instance, the following exposes another factory that uses a specific
|
For instance, the following exposes another factory that uses a specific
|
||||||
`MessageConverter`:
|
`MessageConverter`:
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue