From 6eddf1b372dd8ef87499ec381d05458adeff8577 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 1 May 2017 16:18:59 -0400 Subject: [PATCH] Support direct AMQP container Add support for auto configuration - select container type and separate discrete properties. See gh-9055 --- ...bitListenerContainerFactoryConfigurer.java | 124 ++++++++++++++++++ ...bitListenerContainerFactoryConfigurer.java | 39 ++++++ .../RabbitAnnotationDrivenConfiguration.java | 84 +++++++++--- .../autoconfigure/amqp/RabbitProperties.java | 51 ++++++- ...bitListenerContainerFactoryConfigurer.java | 100 ++------------ .../amqp/RabbitAutoConfigurationTests.java | 31 ++++- .../appendix-application-properties.adoc | 8 +- 7 files changed, 319 insertions(+), 118 deletions(-) create mode 100644 spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java create mode 100644 spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/DirectRabbitListenerContainerFactoryConfigurer.java diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java new file mode 100644 index 00000000000..fc2baffa3c3 --- /dev/null +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java @@ -0,0 +1,124 @@ +/* + * Copyright 2012-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.amqp; + +import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.retry.MessageRecoverer; +import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.boot.autoconfigure.amqp.RabbitProperties.ListenerRetry; +import org.springframework.util.Assert; + +/** + * Configure {@link RabbitListenerContainerFactory} with sensible defaults. + * + * @param the container factory type. + * + * @author Gary Russell + * @since 2.0 + * + */ +public abstract class AbstractRabbitListenerContainerFactoryConfigurer< + T extends AbstractRabbitListenerContainerFactory> { + + private MessageConverter messageConverter; + + private MessageRecoverer messageRecoverer; + + private RabbitProperties rabbitProperties; + + /** + * Set the {@link MessageConverter} to use or {@code null} if the out-of-the-box + * converter should be used. + * @param messageConverter the {@link MessageConverter} + */ + protected void setMessageConverter(MessageConverter messageConverter) { + this.messageConverter = messageConverter; + } + + /** + * Set the {@link MessageRecoverer} to use or {@code null} to rely on the default. + * @param messageRecoverer the {@link MessageRecoverer} + */ + protected void setMessageRecoverer(MessageRecoverer messageRecoverer) { + this.messageRecoverer = messageRecoverer; + } + + /** + * Set the {@link RabbitProperties} to use. + * @param rabbitProperties the {@link RabbitProperties} + */ + protected void setRabbitProperties(RabbitProperties rabbitProperties) { + this.rabbitProperties = rabbitProperties; + } + + /** + * Configure the specified rabbit listener container factory. The factory can be + * further tuned and default settings can be overridden. + * @param factory the {@link AbstractRabbitListenerContainerFactory} instance to + * configure + * @param connectionFactory the {@link ConnectionFactory} to use + */ + public final void configure(T factory, ConnectionFactory connectionFactory) { + Assert.notNull(factory, "Factory must not be null"); + Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); + factory.setConnectionFactory(connectionFactory); + if (this.messageConverter != null) { + factory.setMessageConverter(this.messageConverter); + } + RabbitProperties.Listener listenerConfig = this.rabbitProperties.getListener(); + factory.setAutoStartup(listenerConfig.isAutoStartup()); + if (listenerConfig.getAcknowledgeMode() != null) { + factory.setAcknowledgeMode(listenerConfig.getAcknowledgeMode()); + } + if (listenerConfig.getPrefetch() != null) { + factory.setPrefetchCount(listenerConfig.getPrefetch()); + } + if (listenerConfig.getDefaultRequeueRejected() != null) { + factory.setDefaultRequeueRejected(listenerConfig.getDefaultRequeueRejected()); + } + if (listenerConfig.getIdleEventInterval() != null) { + factory.setIdleEventInterval(listenerConfig.getIdleEventInterval()); + } + ListenerRetry retryConfig = listenerConfig.getRetry(); + if (retryConfig.isEnabled()) { + RetryInterceptorBuilder builder = (retryConfig.isStateless() + ? RetryInterceptorBuilder.stateless() + : RetryInterceptorBuilder.stateful()); + builder.maxAttempts(retryConfig.getMaxAttempts()); + builder.backOffOptions(retryConfig.getInitialInterval(), + retryConfig.getMultiplier(), retryConfig.getMaxInterval()); + MessageRecoverer recoverer = (this.messageRecoverer != null + ? this.messageRecoverer : new RejectAndDontRequeueRecoverer()); + builder.recoverer(recoverer); + factory.setAdviceChain(builder.build()); + } + configure(factory, this.rabbitProperties); + } + + /** + * Perform factory-specific configuration. + * + * @param factory the factory. + * @param rabbitProperties the properties. + */ + protected abstract void configure(T factory, RabbitProperties rabbitProperties); + +} diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/DirectRabbitListenerContainerFactoryConfigurer.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/DirectRabbitListenerContainerFactoryConfigurer.java new file mode 100644 index 00000000000..4a7026210bc --- /dev/null +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/DirectRabbitListenerContainerFactoryConfigurer.java @@ -0,0 +1,39 @@ +/* + * Copyright 2012-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.amqp; + +import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; + +/** + * Configure {@link RabbitListenerContainerFactory} with sensible defaults. + * + * @author Gary Russell + * @since 2.0 + */ +public final class DirectRabbitListenerContainerFactoryConfigurer + extends AbstractRabbitListenerContainerFactoryConfigurer { + + @Override + protected void configure(DirectRabbitListenerContainerFactory factory, RabbitProperties rabbitProperties) { + RabbitProperties.Listener listenerConfig = rabbitProperties.getListener(); + if (listenerConfig.getConsumersPerQueue() != null) { + factory.setConsumersPerQueue(listenerConfig.getConsumersPerQueue()); + } + } + +} diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java index 18c107ca2d7..b7504546018 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java @@ -17,6 +17,7 @@ package org.springframework.boot.autoconfigure.amqp; import org.springframework.amqp.rabbit.annotation.EnableRabbit; +import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; @@ -25,6 +26,7 @@ import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -39,11 +41,11 @@ import org.springframework.context.annotation.Configuration; @ConditionalOnClass(EnableRabbit.class) class RabbitAnnotationDrivenConfiguration { - private final ObjectProvider messageConverter; + protected final ObjectProvider messageConverter; - private final ObjectProvider messageRecoverer; + protected final ObjectProvider messageRecoverer; - private final RabbitProperties properties; + protected final RabbitProperties properties; RabbitAnnotationDrivenConfiguration(ObjectProvider messageConverter, ObjectProvider messageRecoverer, @@ -53,24 +55,68 @@ class RabbitAnnotationDrivenConfiguration { this.properties = properties; } - @Bean - @ConditionalOnMissingBean - public SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer() { - SimpleRabbitListenerContainerFactoryConfigurer configurer = new SimpleRabbitListenerContainerFactoryConfigurer(); - configurer.setMessageConverter(this.messageConverter.getIfUnique()); - configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique()); - configurer.setRabbitProperties(this.properties); - return configurer; + + @Configuration + @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "listener.type", havingValue = "simple", + matchIfMissing = true) + public static class SimpleContainerConfiguration extends RabbitAnnotationDrivenConfiguration { + + SimpleContainerConfiguration(ObjectProvider messageConverter, + ObjectProvider messageRecoverer, RabbitProperties properties) { + super(messageConverter, messageRecoverer, properties); + } + + @Bean + @ConditionalOnMissingBean + public SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer() { + SimpleRabbitListenerContainerFactoryConfigurer configurer = + new SimpleRabbitListenerContainerFactoryConfigurer(); + configurer.setMessageConverter(this.messageConverter.getIfUnique()); + configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique()); + configurer.setRabbitProperties(this.properties); + return configurer; + } + + @Bean + @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory") + public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( + SimpleRabbitListenerContainerFactoryConfigurer configurer, + ConnectionFactory connectionFactory) { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + configurer.configure(factory, connectionFactory); + return factory; + } } - @Bean - @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory") - public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( - SimpleRabbitListenerContainerFactoryConfigurer configurer, - ConnectionFactory connectionFactory) { - SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); - configurer.configure(factory, connectionFactory); - return factory; + @Configuration + @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "listener.type", havingValue = "direct") + public static class DirectContainerConfiguration extends RabbitAnnotationDrivenConfiguration { + + DirectContainerConfiguration(ObjectProvider messageConverter, + ObjectProvider messageRecoverer, RabbitProperties properties) { + super(messageConverter, messageRecoverer, properties); + } + + @Bean + @ConditionalOnMissingBean + public DirectRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer() { + DirectRabbitListenerContainerFactoryConfigurer configurer = + new DirectRabbitListenerContainerFactoryConfigurer(); + configurer.setMessageConverter(this.messageConverter.getIfUnique()); + configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique()); + configurer.setRabbitProperties(this.properties); + return configurer; + } + + @Bean + @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory") + public DirectRabbitListenerContainerFactory rabbitListenerContainerFactory( + DirectRabbitListenerContainerFactoryConfigurer configurer, + ConnectionFactory connectionFactory) { + DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory(); + configurer.configure(factory, connectionFactory); + return factory; + } } @EnableRabbit diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java index f5b8f883cdc..a8ca050c995 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java @@ -40,6 +40,22 @@ import org.springframework.util.StringUtils; @ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitProperties { + public enum ContainerType { + + /** + * SimpleMessageListenerContainer - legacy container where the RabbitMQ consumer + * dispatches messages to an invoker thread. + */ + SIMPLE, + + /** + * DirectMessageListenerContainer - container where the listener is invoked + * directly on the RabbitMQ consumer thread. + */ + DIRECT + + } + /** * RabbitMQ host. */ @@ -572,6 +588,11 @@ public class RabbitProperties { public static class AmqpContainer { + /** + * Container type. + */ + private ContainerType type = ContainerType.SIMPLE; + /** * Start the container automatically on startup. */ @@ -583,15 +604,20 @@ public class RabbitProperties { private AcknowledgeMode acknowledgeMode; /** - * Minimum number of consumers. + * Minimum number of listener invoker threads - applies only to simple containers. */ private Integer concurrency; /** - * Maximum number of consumers. + * Maximum number of listener invoker threads - applies only to simple containers. */ private Integer maxConcurrency; + /** + * Number of RabbitMQ consumers per queue - applies only to direct containers. + */ + private Integer consumersPerQueue; + /** * Number of messages to be handled in a single request. It should be greater than * or equal to the transaction size (if used). @@ -599,8 +625,9 @@ public class RabbitProperties { private Integer prefetch; /** - * Number of messages to be processed in a transaction. For best results it should - * be less than or equal to the prefetch count. + * Number of messages to be processed in a transaction; number of messages between + * acks. For best results it should be less than or equal to the prefetch count - + * applies only to simple containers. */ private Integer transactionSize; @@ -620,6 +647,14 @@ public class RabbitProperties { @NestedConfigurationProperty private final ListenerRetry retry = new ListenerRetry(); + public ContainerType getType() { + return this.type; + } + + public void setType(ContainerType containerType) { + this.type = containerType; + } + public boolean isAutoStartup() { return this.autoStartup; } @@ -652,6 +687,14 @@ public class RabbitProperties { this.maxConcurrency = maxConcurrency; } + public Integer getConsumersPerQueue() { + return this.consumersPerQueue; + } + + public void setConsumersPerQueue(Integer consumersPerQueue) { + this.consumersPerQueue = consumersPerQueue; + } + public Integer getPrefetch() { return this.prefetch; } diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java index 52b6509039f..8d2845c38b3 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java @@ -16,15 +16,8 @@ package org.springframework.boot.autoconfigure.amqp; -import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; -import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; -import org.springframework.amqp.rabbit.retry.MessageRecoverer; -import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer; -import org.springframework.amqp.support.converter.MessageConverter; -import org.springframework.boot.autoconfigure.amqp.RabbitProperties.ListenerRetry; -import org.springframework.util.Assert; /** * Configure {@link RabbitListenerContainerFactory} with sensible defaults. @@ -33,92 +26,21 @@ import org.springframework.util.Assert; * @author Gary Russell * @since 1.3.3 */ -public final class SimpleRabbitListenerContainerFactoryConfigurer { +public final class SimpleRabbitListenerContainerFactoryConfigurer + extends AbstractRabbitListenerContainerFactoryConfigurer { - private MessageConverter messageConverter; - - private MessageRecoverer messageRecoverer; - - private RabbitProperties rabbitProperties; - - /** - * Set the {@link MessageConverter} to use or {@code null} if the out-of-the-box - * converter should be used. - * @param messageConverter the {@link MessageConverter} - */ - void setMessageConverter(MessageConverter messageConverter) { - this.messageConverter = messageConverter; - } - - /** - * Set the {@link MessageRecoverer} to use or {@code null} to rely on the default. - * @param messageRecoverer the {@link MessageRecoverer} - */ - void setMessageRecoverer(MessageRecoverer messageRecoverer) { - this.messageRecoverer = messageRecoverer; - } - - /** - * Set the {@link RabbitProperties} to use. - * @param rabbitProperties the {@link RabbitProperties} - */ - void setRabbitProperties(RabbitProperties rabbitProperties) { - this.rabbitProperties = rabbitProperties; - } - - /** - * Configure the specified rabbit listener container factory. The factory can be - * further tuned and default settings can be overridden. - * @param factory the {@link SimpleRabbitListenerContainerFactory} instance to - * configure - * @param connectionFactory the {@link ConnectionFactory} to use - */ - public void configure(SimpleRabbitListenerContainerFactory factory, - ConnectionFactory connectionFactory) { - Assert.notNull(factory, "Factory must not be null"); - Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); - factory.setConnectionFactory(connectionFactory); - if (this.messageConverter != null) { - factory.setMessageConverter(this.messageConverter); + @Override + protected void configure(SimpleRabbitListenerContainerFactory factory, RabbitProperties rabbitProperties) { + RabbitProperties.Listener listenerConfig = rabbitProperties.getListener(); + if (listenerConfig.getConcurrency() != null) { + factory.setConcurrentConsumers(listenerConfig.getConcurrency()); } - RabbitProperties.AmqpContainer config = this.rabbitProperties.getListener() - .getSimple(); - factory.setAutoStartup(config.isAutoStartup()); - if (config.getAcknowledgeMode() != null) { - factory.setAcknowledgeMode(config.getAcknowledgeMode()); + if (listenerConfig.getMaxConcurrency() != null) { + factory.setMaxConcurrentConsumers(listenerConfig.getMaxConcurrency()); } - if (config.getConcurrency() != null) { - factory.setConcurrentConsumers(config.getConcurrency()); + if (listenerConfig.getTransactionSize() != null) { + factory.setTxSize(listenerConfig.getTransactionSize()); } - if (config.getMaxConcurrency() != null) { - factory.setMaxConcurrentConsumers(config.getMaxConcurrency()); - } - if (config.getPrefetch() != null) { - factory.setPrefetchCount(config.getPrefetch()); - } - if (config.getTransactionSize() != null) { - factory.setTxSize(config.getTransactionSize()); - } - if (config.getDefaultRequeueRejected() != null) { - factory.setDefaultRequeueRejected(config.getDefaultRequeueRejected()); - } - if (config.getIdleEventInterval() != null) { - factory.setIdleEventInterval(config.getIdleEventInterval()); - } - ListenerRetry retryConfig = config.getRetry(); - if (retryConfig.isEnabled()) { - RetryInterceptorBuilder builder = (retryConfig.isStateless() - ? RetryInterceptorBuilder.stateless() - : RetryInterceptorBuilder.stateful()); - builder.maxAttempts(retryConfig.getMaxAttempts()); - builder.backOffOptions(retryConfig.getInitialInterval(), - retryConfig.getMultiplier(), retryConfig.getMaxInterval()); - MessageRecoverer recoverer = (this.messageRecoverer != null - ? this.messageRecoverer : new RejectAndDontRequeueRecoverer()); - builder.recoverer(recoverer); - factory.setAdviceChain(builder.build()); - } - } } diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java index d1a056f74d8..c1c2b23af03 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java @@ -30,6 +30,7 @@ import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.EnableRabbit; +import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; @@ -336,6 +337,33 @@ public class RabbitAutoConfigurationTests { .getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class); DirectFieldAccessor dfa = new DirectFieldAccessor(rabbitListenerContainerFactory); + assertThat(dfa.getPropertyValue("concurrentConsumers")).isEqualTo(5); + assertThat(dfa.getPropertyValue("maxConcurrentConsumers")).isEqualTo(10); + assertThat(dfa.getPropertyValue("txSize")).isEqualTo(20); + checkCommonProps(dfa); + } + + @Test + public void testDirectRabbitListenerContainerFactoryWithCustomSettings() { + load(new Class[] { MessageConvertersConfiguration.class, + MessageRecoverersConfiguration.class }, + "spring.rabbitmq.listener.type:direct", + "spring.rabbitmq.listener.retry.enabled:true", + "spring.rabbitmq.listener.retry.maxAttempts:4", + "spring.rabbitmq.listener.retry.initialInterval:2000", + "spring.rabbitmq.listener.retry.multiplier:1.5", + "spring.rabbitmq.listener.retry.maxInterval:5000", + "spring.rabbitmq.listener.autoStartup:false", + "spring.rabbitmq.listener.acknowledgeMode:manual", + "spring.rabbitmq.listener.consumers-per-queue:5", + "spring.rabbitmq.listener.prefetch:40", + "spring.rabbitmq.listener.defaultRequeueRejected:false", + "spring.rabbitmq.listener.idleEventInterval:5"); + DirectRabbitListenerContainerFactory rabbitListenerContainerFactory = this.context + .getBean("rabbitListenerContainerFactory", + DirectRabbitListenerContainerFactory.class); + DirectFieldAccessor dfa = new DirectFieldAccessor(rabbitListenerContainerFactory); + assertThat(dfa.getPropertyValue("consumersPerQueue")).isEqualTo(5); checkCommonProps(dfa); } @@ -343,10 +371,7 @@ public class RabbitAutoConfigurationTests { assertThat(dfa.getPropertyValue("autoStartup")).isEqualTo(Boolean.FALSE); assertThat(dfa.getPropertyValue("acknowledgeMode")) .isEqualTo(AcknowledgeMode.MANUAL); - assertThat(dfa.getPropertyValue("concurrentConsumers")).isEqualTo(5); - assertThat(dfa.getPropertyValue("maxConcurrentConsumers")).isEqualTo(10); assertThat(dfa.getPropertyValue("prefetchCount")).isEqualTo(40); - assertThat(dfa.getPropertyValue("txSize")).isEqualTo(20); assertThat(dfa.getPropertyValue("messageConverter")) .isSameAs(this.context.getBean("myMessageConverter")); assertThat(dfa.getPropertyValue("defaultRequeueRejected")) diff --git a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index f6166bbc17d..c601a48ea11 100644 --- a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -977,10 +977,11 @@ content into your application; rather pick only the properties that you need. spring.rabbitmq.host=localhost # RabbitMQ host. spring.rabbitmq.listener.simple.acknowledge-mode= # Acknowledge mode of container. spring.rabbitmq.listener.simple.auto-startup=true # Start the container automatically on startup. - spring.rabbitmq.listener.simple.concurrency= # Minimum number of consumers. + spring.rabbitmq.listener.simple.concurrency= # Minimum number of listener invoker threads for a `simple` container. + spring.rabbitmq.listener.consumers-per-queue= # The number of Consumers per queue for a `direct` container. spring.rabbitmq.listener.simple.default-requeue-rejected= # Whether or not to requeue delivery failures; default `true`. spring.rabbitmq.listener.simple.idle-event-interval= # How often idle container events should be published in milliseconds. - spring.rabbitmq.listener.simple.max-concurrency= # Maximum number of consumers. + spring.rabbitmq.listener.simple.max-concurrency= # Maximum number of listener invoker for a `simple` container. spring.rabbitmq.listener.simple.prefetch= # Number of messages to be handled in a single request. It should be greater than or equal to the transaction size (if used). spring.rabbitmq.listener.simple.retry.enabled=false # Whether or not publishing retries are enabled. spring.rabbitmq.listener.simple.retry.initial-interval=1000 # Interval between the first and second attempt to deliver a message. @@ -988,7 +989,8 @@ content into your application; rather pick only the properties that you need. spring.rabbitmq.listener.simple.retry.max-interval=10000 # Maximum interval between attempts. spring.rabbitmq.listener.simple.retry.multiplier=1.0 # A multiplier to apply to the previous delivery retry interval. spring.rabbitmq.listener.simple.retry.stateless=true # Whether or not retry is stateless or stateful. - spring.rabbitmq.listener.simple.transaction-size= # Number of messages to be processed in a transaction. For best results it should be less than or equal to the prefetch count. + spring.rabbitmq.listener.simple.transaction-size= Number of messages to be processed in a transaction; number of messages between acks. For best results it should be less than or equal to the prefetch count - applies only to `simple` containers. + spring.rabbitmq.listener.type= # The listener container type `simple` or `direct`; default `simple`. spring.rabbitmq.password= # Login to authenticate against the broker. spring.rabbitmq.port=5672 # RabbitMQ port. spring.rabbitmq.publisher-confirms=false # Enable publisher confirms.