Support direct AMQP container
Add support for auto configuration - select container type and separate discrete properties. See gh-9055
This commit is contained in:
parent
0f38031f93
commit
6eddf1b372
|
@ -0,0 +1,124 @@
|
|||
/*
|
||||
* Copyright 2012-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.autoconfigure.amqp;
|
||||
|
||||
import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory;
|
||||
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
|
||||
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
|
||||
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.ListenerRetry;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* Configure {@link RabbitListenerContainerFactory} with sensible defaults.
|
||||
*
|
||||
* @param <T> the container factory type.
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @since 2.0
|
||||
*
|
||||
*/
|
||||
public abstract class AbstractRabbitListenerContainerFactoryConfigurer<
|
||||
T extends AbstractRabbitListenerContainerFactory<?>> {
|
||||
|
||||
private MessageConverter messageConverter;
|
||||
|
||||
private MessageRecoverer messageRecoverer;
|
||||
|
||||
private RabbitProperties rabbitProperties;
|
||||
|
||||
/**
|
||||
* Set the {@link MessageConverter} to use or {@code null} if the out-of-the-box
|
||||
* converter should be used.
|
||||
* @param messageConverter the {@link MessageConverter}
|
||||
*/
|
||||
protected void setMessageConverter(MessageConverter messageConverter) {
|
||||
this.messageConverter = messageConverter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link MessageRecoverer} to use or {@code null} to rely on the default.
|
||||
* @param messageRecoverer the {@link MessageRecoverer}
|
||||
*/
|
||||
protected void setMessageRecoverer(MessageRecoverer messageRecoverer) {
|
||||
this.messageRecoverer = messageRecoverer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link RabbitProperties} to use.
|
||||
* @param rabbitProperties the {@link RabbitProperties}
|
||||
*/
|
||||
protected void setRabbitProperties(RabbitProperties rabbitProperties) {
|
||||
this.rabbitProperties = rabbitProperties;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the specified rabbit listener container factory. The factory can be
|
||||
* further tuned and default settings can be overridden.
|
||||
* @param factory the {@link AbstractRabbitListenerContainerFactory} instance to
|
||||
* configure
|
||||
* @param connectionFactory the {@link ConnectionFactory} to use
|
||||
*/
|
||||
public final void configure(T factory, ConnectionFactory connectionFactory) {
|
||||
Assert.notNull(factory, "Factory must not be null");
|
||||
Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
|
||||
factory.setConnectionFactory(connectionFactory);
|
||||
if (this.messageConverter != null) {
|
||||
factory.setMessageConverter(this.messageConverter);
|
||||
}
|
||||
RabbitProperties.Listener listenerConfig = this.rabbitProperties.getListener();
|
||||
factory.setAutoStartup(listenerConfig.isAutoStartup());
|
||||
if (listenerConfig.getAcknowledgeMode() != null) {
|
||||
factory.setAcknowledgeMode(listenerConfig.getAcknowledgeMode());
|
||||
}
|
||||
if (listenerConfig.getPrefetch() != null) {
|
||||
factory.setPrefetchCount(listenerConfig.getPrefetch());
|
||||
}
|
||||
if (listenerConfig.getDefaultRequeueRejected() != null) {
|
||||
factory.setDefaultRequeueRejected(listenerConfig.getDefaultRequeueRejected());
|
||||
}
|
||||
if (listenerConfig.getIdleEventInterval() != null) {
|
||||
factory.setIdleEventInterval(listenerConfig.getIdleEventInterval());
|
||||
}
|
||||
ListenerRetry retryConfig = listenerConfig.getRetry();
|
||||
if (retryConfig.isEnabled()) {
|
||||
RetryInterceptorBuilder<?> builder = (retryConfig.isStateless()
|
||||
? RetryInterceptorBuilder.stateless()
|
||||
: RetryInterceptorBuilder.stateful());
|
||||
builder.maxAttempts(retryConfig.getMaxAttempts());
|
||||
builder.backOffOptions(retryConfig.getInitialInterval(),
|
||||
retryConfig.getMultiplier(), retryConfig.getMaxInterval());
|
||||
MessageRecoverer recoverer = (this.messageRecoverer != null
|
||||
? this.messageRecoverer : new RejectAndDontRequeueRecoverer());
|
||||
builder.recoverer(recoverer);
|
||||
factory.setAdviceChain(builder.build());
|
||||
}
|
||||
configure(factory, this.rabbitProperties);
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform factory-specific configuration.
|
||||
*
|
||||
* @param factory the factory.
|
||||
* @param rabbitProperties the properties.
|
||||
*/
|
||||
protected abstract void configure(T factory, RabbitProperties rabbitProperties);
|
||||
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Copyright 2012-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.autoconfigure.amqp;
|
||||
|
||||
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
|
||||
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
|
||||
|
||||
/**
|
||||
* Configure {@link RabbitListenerContainerFactory} with sensible defaults.
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @since 2.0
|
||||
*/
|
||||
public final class DirectRabbitListenerContainerFactoryConfigurer
|
||||
extends AbstractRabbitListenerContainerFactoryConfigurer<DirectRabbitListenerContainerFactory> {
|
||||
|
||||
@Override
|
||||
protected void configure(DirectRabbitListenerContainerFactory factory, RabbitProperties rabbitProperties) {
|
||||
RabbitProperties.Listener listenerConfig = rabbitProperties.getListener();
|
||||
if (listenerConfig.getConsumersPerQueue() != null) {
|
||||
factory.setConsumersPerQueue(listenerConfig.getConsumersPerQueue());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -17,6 +17,7 @@
|
|||
package org.springframework.boot.autoconfigure.amqp;
|
||||
|
||||
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
|
||||
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
|
||||
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
|
||||
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
|
@ -25,6 +26,7 @@ import org.springframework.amqp.support.converter.MessageConverter;
|
|||
import org.springframework.beans.factory.ObjectProvider;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
|
@ -39,11 +41,11 @@ import org.springframework.context.annotation.Configuration;
|
|||
@ConditionalOnClass(EnableRabbit.class)
|
||||
class RabbitAnnotationDrivenConfiguration {
|
||||
|
||||
private final ObjectProvider<MessageConverter> messageConverter;
|
||||
protected final ObjectProvider<MessageConverter> messageConverter;
|
||||
|
||||
private final ObjectProvider<MessageRecoverer> messageRecoverer;
|
||||
protected final ObjectProvider<MessageRecoverer> messageRecoverer;
|
||||
|
||||
private final RabbitProperties properties;
|
||||
protected final RabbitProperties properties;
|
||||
|
||||
RabbitAnnotationDrivenConfiguration(ObjectProvider<MessageConverter> messageConverter,
|
||||
ObjectProvider<MessageRecoverer> messageRecoverer,
|
||||
|
@ -53,24 +55,68 @@ class RabbitAnnotationDrivenConfiguration {
|
|||
this.properties = properties;
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer() {
|
||||
SimpleRabbitListenerContainerFactoryConfigurer configurer = new SimpleRabbitListenerContainerFactoryConfigurer();
|
||||
configurer.setMessageConverter(this.messageConverter.getIfUnique());
|
||||
configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique());
|
||||
configurer.setRabbitProperties(this.properties);
|
||||
return configurer;
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "listener.type", havingValue = "simple",
|
||||
matchIfMissing = true)
|
||||
public static class SimpleContainerConfiguration extends RabbitAnnotationDrivenConfiguration {
|
||||
|
||||
SimpleContainerConfiguration(ObjectProvider<MessageConverter> messageConverter,
|
||||
ObjectProvider<MessageRecoverer> messageRecoverer, RabbitProperties properties) {
|
||||
super(messageConverter, messageRecoverer, properties);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer() {
|
||||
SimpleRabbitListenerContainerFactoryConfigurer configurer =
|
||||
new SimpleRabbitListenerContainerFactoryConfigurer();
|
||||
configurer.setMessageConverter(this.messageConverter.getIfUnique());
|
||||
configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique());
|
||||
configurer.setRabbitProperties(this.properties);
|
||||
return configurer;
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
|
||||
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
|
||||
SimpleRabbitListenerContainerFactoryConfigurer configurer,
|
||||
ConnectionFactory connectionFactory) {
|
||||
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
||||
configurer.configure(factory, connectionFactory);
|
||||
return factory;
|
||||
}
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
|
||||
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
|
||||
SimpleRabbitListenerContainerFactoryConfigurer configurer,
|
||||
ConnectionFactory connectionFactory) {
|
||||
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
||||
configurer.configure(factory, connectionFactory);
|
||||
return factory;
|
||||
@Configuration
|
||||
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "listener.type", havingValue = "direct")
|
||||
public static class DirectContainerConfiguration extends RabbitAnnotationDrivenConfiguration {
|
||||
|
||||
DirectContainerConfiguration(ObjectProvider<MessageConverter> messageConverter,
|
||||
ObjectProvider<MessageRecoverer> messageRecoverer, RabbitProperties properties) {
|
||||
super(messageConverter, messageRecoverer, properties);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public DirectRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer() {
|
||||
DirectRabbitListenerContainerFactoryConfigurer configurer =
|
||||
new DirectRabbitListenerContainerFactoryConfigurer();
|
||||
configurer.setMessageConverter(this.messageConverter.getIfUnique());
|
||||
configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique());
|
||||
configurer.setRabbitProperties(this.properties);
|
||||
return configurer;
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
|
||||
public DirectRabbitListenerContainerFactory rabbitListenerContainerFactory(
|
||||
DirectRabbitListenerContainerFactoryConfigurer configurer,
|
||||
ConnectionFactory connectionFactory) {
|
||||
DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
|
||||
configurer.configure(factory, connectionFactory);
|
||||
return factory;
|
||||
}
|
||||
}
|
||||
|
||||
@EnableRabbit
|
||||
|
|
|
@ -40,6 +40,22 @@ import org.springframework.util.StringUtils;
|
|||
@ConfigurationProperties(prefix = "spring.rabbitmq")
|
||||
public class RabbitProperties {
|
||||
|
||||
public enum ContainerType {
|
||||
|
||||
/**
|
||||
* SimpleMessageListenerContainer - legacy container where the RabbitMQ consumer
|
||||
* dispatches messages to an invoker thread.
|
||||
*/
|
||||
SIMPLE,
|
||||
|
||||
/**
|
||||
* DirectMessageListenerContainer - container where the listener is invoked
|
||||
* directly on the RabbitMQ consumer thread.
|
||||
*/
|
||||
DIRECT
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* RabbitMQ host.
|
||||
*/
|
||||
|
@ -572,6 +588,11 @@ public class RabbitProperties {
|
|||
|
||||
public static class AmqpContainer {
|
||||
|
||||
/**
|
||||
* Container type.
|
||||
*/
|
||||
private ContainerType type = ContainerType.SIMPLE;
|
||||
|
||||
/**
|
||||
* Start the container automatically on startup.
|
||||
*/
|
||||
|
@ -583,15 +604,20 @@ public class RabbitProperties {
|
|||
private AcknowledgeMode acknowledgeMode;
|
||||
|
||||
/**
|
||||
* Minimum number of consumers.
|
||||
* Minimum number of listener invoker threads - applies only to simple containers.
|
||||
*/
|
||||
private Integer concurrency;
|
||||
|
||||
/**
|
||||
* Maximum number of consumers.
|
||||
* Maximum number of listener invoker threads - applies only to simple containers.
|
||||
*/
|
||||
private Integer maxConcurrency;
|
||||
|
||||
/**
|
||||
* Number of RabbitMQ consumers per queue - applies only to direct containers.
|
||||
*/
|
||||
private Integer consumersPerQueue;
|
||||
|
||||
/**
|
||||
* Number of messages to be handled in a single request. It should be greater than
|
||||
* or equal to the transaction size (if used).
|
||||
|
@ -599,8 +625,9 @@ public class RabbitProperties {
|
|||
private Integer prefetch;
|
||||
|
||||
/**
|
||||
* Number of messages to be processed in a transaction. For best results it should
|
||||
* be less than or equal to the prefetch count.
|
||||
* Number of messages to be processed in a transaction; number of messages between
|
||||
* acks. For best results it should be less than or equal to the prefetch count -
|
||||
* applies only to simple containers.
|
||||
*/
|
||||
private Integer transactionSize;
|
||||
|
||||
|
@ -620,6 +647,14 @@ public class RabbitProperties {
|
|||
@NestedConfigurationProperty
|
||||
private final ListenerRetry retry = new ListenerRetry();
|
||||
|
||||
public ContainerType getType() {
|
||||
return this.type;
|
||||
}
|
||||
|
||||
public void setType(ContainerType containerType) {
|
||||
this.type = containerType;
|
||||
}
|
||||
|
||||
public boolean isAutoStartup() {
|
||||
return this.autoStartup;
|
||||
}
|
||||
|
@ -652,6 +687,14 @@ public class RabbitProperties {
|
|||
this.maxConcurrency = maxConcurrency;
|
||||
}
|
||||
|
||||
public Integer getConsumersPerQueue() {
|
||||
return this.consumersPerQueue;
|
||||
}
|
||||
|
||||
public void setConsumersPerQueue(Integer consumersPerQueue) {
|
||||
this.consumersPerQueue = consumersPerQueue;
|
||||
}
|
||||
|
||||
public Integer getPrefetch() {
|
||||
return this.prefetch;
|
||||
}
|
||||
|
|
|
@ -16,15 +16,8 @@
|
|||
|
||||
package org.springframework.boot.autoconfigure.amqp;
|
||||
|
||||
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
|
||||
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
|
||||
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
|
||||
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.ListenerRetry;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* Configure {@link RabbitListenerContainerFactory} with sensible defaults.
|
||||
|
@ -33,92 +26,21 @@ import org.springframework.util.Assert;
|
|||
* @author Gary Russell
|
||||
* @since 1.3.3
|
||||
*/
|
||||
public final class SimpleRabbitListenerContainerFactoryConfigurer {
|
||||
public final class SimpleRabbitListenerContainerFactoryConfigurer
|
||||
extends AbstractRabbitListenerContainerFactoryConfigurer<SimpleRabbitListenerContainerFactory> {
|
||||
|
||||
private MessageConverter messageConverter;
|
||||
|
||||
private MessageRecoverer messageRecoverer;
|
||||
|
||||
private RabbitProperties rabbitProperties;
|
||||
|
||||
/**
|
||||
* Set the {@link MessageConverter} to use or {@code null} if the out-of-the-box
|
||||
* converter should be used.
|
||||
* @param messageConverter the {@link MessageConverter}
|
||||
*/
|
||||
void setMessageConverter(MessageConverter messageConverter) {
|
||||
this.messageConverter = messageConverter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link MessageRecoverer} to use or {@code null} to rely on the default.
|
||||
* @param messageRecoverer the {@link MessageRecoverer}
|
||||
*/
|
||||
void setMessageRecoverer(MessageRecoverer messageRecoverer) {
|
||||
this.messageRecoverer = messageRecoverer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link RabbitProperties} to use.
|
||||
* @param rabbitProperties the {@link RabbitProperties}
|
||||
*/
|
||||
void setRabbitProperties(RabbitProperties rabbitProperties) {
|
||||
this.rabbitProperties = rabbitProperties;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the specified rabbit listener container factory. The factory can be
|
||||
* further tuned and default settings can be overridden.
|
||||
* @param factory the {@link SimpleRabbitListenerContainerFactory} instance to
|
||||
* configure
|
||||
* @param connectionFactory the {@link ConnectionFactory} to use
|
||||
*/
|
||||
public void configure(SimpleRabbitListenerContainerFactory factory,
|
||||
ConnectionFactory connectionFactory) {
|
||||
Assert.notNull(factory, "Factory must not be null");
|
||||
Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
|
||||
factory.setConnectionFactory(connectionFactory);
|
||||
if (this.messageConverter != null) {
|
||||
factory.setMessageConverter(this.messageConverter);
|
||||
@Override
|
||||
protected void configure(SimpleRabbitListenerContainerFactory factory, RabbitProperties rabbitProperties) {
|
||||
RabbitProperties.Listener listenerConfig = rabbitProperties.getListener();
|
||||
if (listenerConfig.getConcurrency() != null) {
|
||||
factory.setConcurrentConsumers(listenerConfig.getConcurrency());
|
||||
}
|
||||
RabbitProperties.AmqpContainer config = this.rabbitProperties.getListener()
|
||||
.getSimple();
|
||||
factory.setAutoStartup(config.isAutoStartup());
|
||||
if (config.getAcknowledgeMode() != null) {
|
||||
factory.setAcknowledgeMode(config.getAcknowledgeMode());
|
||||
if (listenerConfig.getMaxConcurrency() != null) {
|
||||
factory.setMaxConcurrentConsumers(listenerConfig.getMaxConcurrency());
|
||||
}
|
||||
if (config.getConcurrency() != null) {
|
||||
factory.setConcurrentConsumers(config.getConcurrency());
|
||||
if (listenerConfig.getTransactionSize() != null) {
|
||||
factory.setTxSize(listenerConfig.getTransactionSize());
|
||||
}
|
||||
if (config.getMaxConcurrency() != null) {
|
||||
factory.setMaxConcurrentConsumers(config.getMaxConcurrency());
|
||||
}
|
||||
if (config.getPrefetch() != null) {
|
||||
factory.setPrefetchCount(config.getPrefetch());
|
||||
}
|
||||
if (config.getTransactionSize() != null) {
|
||||
factory.setTxSize(config.getTransactionSize());
|
||||
}
|
||||
if (config.getDefaultRequeueRejected() != null) {
|
||||
factory.setDefaultRequeueRejected(config.getDefaultRequeueRejected());
|
||||
}
|
||||
if (config.getIdleEventInterval() != null) {
|
||||
factory.setIdleEventInterval(config.getIdleEventInterval());
|
||||
}
|
||||
ListenerRetry retryConfig = config.getRetry();
|
||||
if (retryConfig.isEnabled()) {
|
||||
RetryInterceptorBuilder<?> builder = (retryConfig.isStateless()
|
||||
? RetryInterceptorBuilder.stateless()
|
||||
: RetryInterceptorBuilder.stateful());
|
||||
builder.maxAttempts(retryConfig.getMaxAttempts());
|
||||
builder.backOffOptions(retryConfig.getInitialInterval(),
|
||||
retryConfig.getMultiplier(), retryConfig.getMaxInterval());
|
||||
MessageRecoverer recoverer = (this.messageRecoverer != null
|
||||
? this.messageRecoverer : new RejectAndDontRequeueRecoverer());
|
||||
builder.recoverer(recoverer);
|
||||
factory.setAdviceChain(builder.build());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.springframework.amqp.core.AcknowledgeMode;
|
|||
import org.springframework.amqp.core.AmqpAdmin;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
|
||||
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
|
||||
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
|
||||
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
||||
|
@ -336,6 +337,33 @@ public class RabbitAutoConfigurationTests {
|
|||
.getBean("rabbitListenerContainerFactory",
|
||||
SimpleRabbitListenerContainerFactory.class);
|
||||
DirectFieldAccessor dfa = new DirectFieldAccessor(rabbitListenerContainerFactory);
|
||||
assertThat(dfa.getPropertyValue("concurrentConsumers")).isEqualTo(5);
|
||||
assertThat(dfa.getPropertyValue("maxConcurrentConsumers")).isEqualTo(10);
|
||||
assertThat(dfa.getPropertyValue("txSize")).isEqualTo(20);
|
||||
checkCommonProps(dfa);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDirectRabbitListenerContainerFactoryWithCustomSettings() {
|
||||
load(new Class<?>[] { MessageConvertersConfiguration.class,
|
||||
MessageRecoverersConfiguration.class },
|
||||
"spring.rabbitmq.listener.type:direct",
|
||||
"spring.rabbitmq.listener.retry.enabled:true",
|
||||
"spring.rabbitmq.listener.retry.maxAttempts:4",
|
||||
"spring.rabbitmq.listener.retry.initialInterval:2000",
|
||||
"spring.rabbitmq.listener.retry.multiplier:1.5",
|
||||
"spring.rabbitmq.listener.retry.maxInterval:5000",
|
||||
"spring.rabbitmq.listener.autoStartup:false",
|
||||
"spring.rabbitmq.listener.acknowledgeMode:manual",
|
||||
"spring.rabbitmq.listener.consumers-per-queue:5",
|
||||
"spring.rabbitmq.listener.prefetch:40",
|
||||
"spring.rabbitmq.listener.defaultRequeueRejected:false",
|
||||
"spring.rabbitmq.listener.idleEventInterval:5");
|
||||
DirectRabbitListenerContainerFactory rabbitListenerContainerFactory = this.context
|
||||
.getBean("rabbitListenerContainerFactory",
|
||||
DirectRabbitListenerContainerFactory.class);
|
||||
DirectFieldAccessor dfa = new DirectFieldAccessor(rabbitListenerContainerFactory);
|
||||
assertThat(dfa.getPropertyValue("consumersPerQueue")).isEqualTo(5);
|
||||
checkCommonProps(dfa);
|
||||
}
|
||||
|
||||
|
@ -343,10 +371,7 @@ public class RabbitAutoConfigurationTests {
|
|||
assertThat(dfa.getPropertyValue("autoStartup")).isEqualTo(Boolean.FALSE);
|
||||
assertThat(dfa.getPropertyValue("acknowledgeMode"))
|
||||
.isEqualTo(AcknowledgeMode.MANUAL);
|
||||
assertThat(dfa.getPropertyValue("concurrentConsumers")).isEqualTo(5);
|
||||
assertThat(dfa.getPropertyValue("maxConcurrentConsumers")).isEqualTo(10);
|
||||
assertThat(dfa.getPropertyValue("prefetchCount")).isEqualTo(40);
|
||||
assertThat(dfa.getPropertyValue("txSize")).isEqualTo(20);
|
||||
assertThat(dfa.getPropertyValue("messageConverter"))
|
||||
.isSameAs(this.context.getBean("myMessageConverter"));
|
||||
assertThat(dfa.getPropertyValue("defaultRequeueRejected"))
|
||||
|
|
|
@ -977,10 +977,11 @@ content into your application; rather pick only the properties that you need.
|
|||
spring.rabbitmq.host=localhost # RabbitMQ host.
|
||||
spring.rabbitmq.listener.simple.acknowledge-mode= # Acknowledge mode of container.
|
||||
spring.rabbitmq.listener.simple.auto-startup=true # Start the container automatically on startup.
|
||||
spring.rabbitmq.listener.simple.concurrency= # Minimum number of consumers.
|
||||
spring.rabbitmq.listener.simple.concurrency= # Minimum number of listener invoker threads for a `simple` container.
|
||||
spring.rabbitmq.listener.consumers-per-queue= # The number of Consumers per queue for a `direct` container.
|
||||
spring.rabbitmq.listener.simple.default-requeue-rejected= # Whether or not to requeue delivery failures; default `true`.
|
||||
spring.rabbitmq.listener.simple.idle-event-interval= # How often idle container events should be published in milliseconds.
|
||||
spring.rabbitmq.listener.simple.max-concurrency= # Maximum number of consumers.
|
||||
spring.rabbitmq.listener.simple.max-concurrency= # Maximum number of listener invoker for a `simple` container.
|
||||
spring.rabbitmq.listener.simple.prefetch= # Number of messages to be handled in a single request. It should be greater than or equal to the transaction size (if used).
|
||||
spring.rabbitmq.listener.simple.retry.enabled=false # Whether or not publishing retries are enabled.
|
||||
spring.rabbitmq.listener.simple.retry.initial-interval=1000 # Interval between the first and second attempt to deliver a message.
|
||||
|
@ -988,7 +989,8 @@ content into your application; rather pick only the properties that you need.
|
|||
spring.rabbitmq.listener.simple.retry.max-interval=10000 # Maximum interval between attempts.
|
||||
spring.rabbitmq.listener.simple.retry.multiplier=1.0 # A multiplier to apply to the previous delivery retry interval.
|
||||
spring.rabbitmq.listener.simple.retry.stateless=true # Whether or not retry is stateless or stateful.
|
||||
spring.rabbitmq.listener.simple.transaction-size= # Number of messages to be processed in a transaction. For best results it should be less than or equal to the prefetch count.
|
||||
spring.rabbitmq.listener.simple.transaction-size= Number of messages to be processed in a transaction; number of messages between acks. For best results it should be less than or equal to the prefetch count - applies only to `simple` containers.
|
||||
spring.rabbitmq.listener.type= # The listener container type `simple` or `direct`; default `simple`.
|
||||
spring.rabbitmq.password= # Login to authenticate against the broker.
|
||||
spring.rabbitmq.port=5672 # RabbitMQ port.
|
||||
spring.rabbitmq.publisher-confirms=false # Enable publisher confirms.
|
||||
|
|
Loading…
Reference in New Issue