Add Retry Config: Template and Listener
Also add requeue rejected to listener config and timeouts to RabbitTemplate config. Closes gh-5340
This commit is contained in:
parent
f4bb9e3ba7
commit
08732fe4c8
|
|
@ -29,6 +29,8 @@ import org.springframework.amqp.support.converter.MessageConverter;
|
||||||
import org.springframework.beans.factory.ObjectProvider;
|
import org.springframework.beans.factory.ObjectProvider;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||||
|
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.Retry;
|
||||||
|
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.Template;
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
|
|
@ -36,6 +38,9 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.context.annotation.Import;
|
import org.springframework.context.annotation.Import;
|
||||||
|
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
|
||||||
|
import org.springframework.retry.policy.SimpleRetryPolicy;
|
||||||
|
import org.springframework.retry.support.RetryTemplate;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link EnableAutoConfiguration Auto-configuration} for {@link RabbitTemplate}.
|
* {@link EnableAutoConfiguration Auto-configuration} for {@link RabbitTemplate}.
|
||||||
|
|
@ -94,12 +99,32 @@ public class RabbitAutoConfiguration {
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
@ConditionalOnMissingBean(RabbitTemplate.class)
|
@ConditionalOnMissingBean(RabbitTemplate.class)
|
||||||
public RabbitTemplate rabbitTemplate() {
|
public RabbitTemplate rabbitTemplate(RabbitProperties config) {
|
||||||
RabbitTemplate rabbitTemplate = new RabbitTemplate(this.connectionFactory);
|
RabbitTemplate rabbitTemplate = new RabbitTemplate(this.connectionFactory);
|
||||||
MessageConverter messageConverter = this.messageConverter.getIfUnique();
|
MessageConverter messageConverter = this.messageConverter.getIfUnique();
|
||||||
if (messageConverter != null) {
|
if (messageConverter != null) {
|
||||||
rabbitTemplate.setMessageConverter(messageConverter);
|
rabbitTemplate.setMessageConverter(messageConverter);
|
||||||
}
|
}
|
||||||
|
Template template = config.getTemplate();
|
||||||
|
Retry retry = template.getRetry();
|
||||||
|
if (retry.isEnable()) {
|
||||||
|
RetryTemplate retryTemplate = new RetryTemplate();
|
||||||
|
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
|
||||||
|
retryPolicy.setMaxAttempts(retry.getMaxAttempts());
|
||||||
|
retryTemplate.setRetryPolicy(retryPolicy);
|
||||||
|
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
|
||||||
|
backOffPolicy.setInitialInterval(retry.getInitialInterval());
|
||||||
|
backOffPolicy.setMultiplier(retry.getMultiplier());
|
||||||
|
backOffPolicy.setMaxInterval(retry.getMaxInterval());
|
||||||
|
retryTemplate.setBackOffPolicy(backOffPolicy);
|
||||||
|
rabbitTemplate.setRetryTemplate(retryTemplate);
|
||||||
|
}
|
||||||
|
if (template.getReceiveTimeout() != null) {
|
||||||
|
rabbitTemplate.setReceiveTimeout(template.getReceiveTimeout());
|
||||||
|
}
|
||||||
|
if (template.getReplyTimeout() != null) {
|
||||||
|
rabbitTemplate.setReplyTimeout(template.getReplyTimeout());
|
||||||
|
}
|
||||||
return rabbitTemplate;
|
return rabbitTemplate;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -87,6 +87,8 @@ public class RabbitProperties {
|
||||||
*/
|
*/
|
||||||
private final Listener listener = new Listener();
|
private final Listener listener = new Listener();
|
||||||
|
|
||||||
|
private final Template template = new Template();
|
||||||
|
|
||||||
public String getHost() {
|
public String getHost() {
|
||||||
if (this.addresses == null) {
|
if (this.addresses == null) {
|
||||||
return this.host;
|
return this.host;
|
||||||
|
|
@ -201,6 +203,10 @@ public class RabbitProperties {
|
||||||
return this.listener;
|
return this.listener;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Template getTemplate() {
|
||||||
|
return this.template;
|
||||||
|
}
|
||||||
|
|
||||||
public static class Ssl {
|
public static class Ssl {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -382,6 +388,16 @@ public class RabbitProperties {
|
||||||
*/
|
*/
|
||||||
private Integer transactionSize;
|
private Integer transactionSize;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether rejected deliveries are requeued by default; default true.
|
||||||
|
*/
|
||||||
|
private Boolean defaultRequeueRejected;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Optional properties for a retry interceptor.
|
||||||
|
*/
|
||||||
|
private final ListenerRetry retry = new ListenerRetry();
|
||||||
|
|
||||||
public boolean isAutoStartup() {
|
public boolean isAutoStartup() {
|
||||||
return this.autoStartup;
|
return this.autoStartup;
|
||||||
}
|
}
|
||||||
|
|
@ -429,6 +445,142 @@ public class RabbitProperties {
|
||||||
public void setTransactionSize(Integer transactionSize) {
|
public void setTransactionSize(Integer transactionSize) {
|
||||||
this.transactionSize = transactionSize;
|
this.transactionSize = transactionSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Boolean getDefaultRequeueRejected() {
|
||||||
|
return this.defaultRequeueRejected;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDefaultRequeueRejected(Boolean defaultRequeueRejected) {
|
||||||
|
this.defaultRequeueRejected = defaultRequeueRejected;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ListenerRetry getRetry() {
|
||||||
|
return this.retry;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Template {
|
||||||
|
|
||||||
|
private final Retry retry = new Retry();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Timeout for receive() operations.
|
||||||
|
*/
|
||||||
|
private Long receiveTimeout;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Timeout for sendAndReceive() operations.
|
||||||
|
*/
|
||||||
|
private Long replyTimeout;
|
||||||
|
|
||||||
|
public Retry getRetry() {
|
||||||
|
return this.retry;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getReceiveTimeout() {
|
||||||
|
return this.receiveTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setReceiveTimeout(Long receiveTimeout) {
|
||||||
|
this.receiveTimeout = receiveTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getReplyTimeout() {
|
||||||
|
return this.replyTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setReplyTimeout(Long replyTimeout) {
|
||||||
|
this.replyTimeout = replyTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Retry {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether or not publishing retries are enabled.
|
||||||
|
*/
|
||||||
|
private boolean enable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The maximum number of attempts to publish or deliver a message.
|
||||||
|
*/
|
||||||
|
private int maxAttempts = 3;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The interval between the first and second attempt to publish
|
||||||
|
* or deliver a message.
|
||||||
|
*/
|
||||||
|
private long initialInterval = 1000L;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A multiplier to apply to the previous retry interval.
|
||||||
|
*/
|
||||||
|
private double multiplier = 1.0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The maximum interval between attempts.
|
||||||
|
*/
|
||||||
|
private long maxInterval = 10000L;
|
||||||
|
|
||||||
|
public boolean isEnable() {
|
||||||
|
return this.enable;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEnable(boolean enable) {
|
||||||
|
this.enable = enable;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxAttempts() {
|
||||||
|
return this.maxAttempts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxAttempts(int maxAttempts) {
|
||||||
|
this.maxAttempts = maxAttempts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getInitialInterval() {
|
||||||
|
return this.initialInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setInitialInterval(long initialInterval) {
|
||||||
|
this.initialInterval = initialInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getMultiplier() {
|
||||||
|
return this.multiplier;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMultiplier(double multiplier) {
|
||||||
|
this.multiplier = multiplier;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMaxInterval() {
|
||||||
|
return this.maxInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxInterval(long maxInterval) {
|
||||||
|
this.maxInterval = maxInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class ListenerRetry extends Retry {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether or not retries are stateless or stateful.
|
||||||
|
*/
|
||||||
|
private boolean stateless = true;
|
||||||
|
|
||||||
|
public boolean isStateless() {
|
||||||
|
return this.stateless;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setStateless(boolean stateless) {
|
||||||
|
this.stateless = stateless;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,16 +16,20 @@
|
||||||
|
|
||||||
package org.springframework.boot.autoconfigure.amqp;
|
package org.springframework.boot.autoconfigure.amqp;
|
||||||
|
|
||||||
|
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
|
||||||
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||||
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
|
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
|
||||||
|
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
|
||||||
import org.springframework.amqp.support.converter.MessageConverter;
|
import org.springframework.amqp.support.converter.MessageConverter;
|
||||||
|
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.ListenerRetry;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configure {@link RabbitListenerContainerFactory} with sensible defaults.
|
* Configure {@link RabbitListenerContainerFactory} with sensible defaults.
|
||||||
*
|
*
|
||||||
* @author Stephane Nicoll
|
* @author Stephane Nicoll
|
||||||
|
* @author Gary Russell
|
||||||
* @since 1.3.3
|
* @since 1.3.3
|
||||||
*/
|
*/
|
||||||
public final class SimpleRabbitListenerContainerFactoryConfigurer {
|
public final class SimpleRabbitListenerContainerFactoryConfigurer {
|
||||||
|
|
@ -83,6 +87,25 @@ public final class SimpleRabbitListenerContainerFactoryConfigurer {
|
||||||
if (listenerConfig.getTransactionSize() != null) {
|
if (listenerConfig.getTransactionSize() != null) {
|
||||||
factory.setTxSize(listenerConfig.getTransactionSize());
|
factory.setTxSize(listenerConfig.getTransactionSize());
|
||||||
}
|
}
|
||||||
|
if (listenerConfig.getDefaultRequeueRejected() != null) {
|
||||||
|
factory.setDefaultRequeueRejected(listenerConfig.getDefaultRequeueRejected());
|
||||||
|
}
|
||||||
|
ListenerRetry retryConfig = listenerConfig.getRetry();
|
||||||
|
if (retryConfig.isEnable()) {
|
||||||
|
RetryInterceptorBuilder<?> builder;
|
||||||
|
if (retryConfig.isStateless()) {
|
||||||
|
builder = RetryInterceptorBuilder.stateless();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
builder = RetryInterceptorBuilder.stateful();
|
||||||
|
}
|
||||||
|
factory.setAdviceChain(builder
|
||||||
|
.maxAttempts(retryConfig.getMaxAttempts())
|
||||||
|
.backOffOptions(retryConfig.getInitialInterval(),
|
||||||
|
retryConfig.getMultiplier(), retryConfig.getMaxInterval())
|
||||||
|
.recoverer(new RejectAndDontRequeueRecoverer())
|
||||||
|
.build());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ package org.springframework.boot.autoconfigure.amqp;
|
||||||
import javax.net.SocketFactory;
|
import javax.net.SocketFactory;
|
||||||
import javax.net.ssl.SSLSocketFactory;
|
import javax.net.ssl.SSLSocketFactory;
|
||||||
|
|
||||||
|
import org.aopalliance.aop.Advice;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
@ -44,6 +45,9 @@ import org.springframework.context.annotation.AnnotationConfigApplicationContext
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.context.annotation.Primary;
|
import org.springframework.context.annotation.Primary;
|
||||||
|
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
|
||||||
|
import org.springframework.retry.policy.SimpleRetryPolicy;
|
||||||
|
import org.springframework.retry.support.RetryTemplate;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
@ -88,7 +92,7 @@ public class RabbitAutoConfigurationTests {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRabbitTemplateWithOverrides() {
|
public void testConnectionFactoryWithOverrides() {
|
||||||
load(TestConfiguration.class, "spring.rabbitmq.host:remote-server",
|
load(TestConfiguration.class, "spring.rabbitmq.host:remote-server",
|
||||||
"spring.rabbitmq.port:9000", "spring.rabbitmq.username:alice",
|
"spring.rabbitmq.port:9000", "spring.rabbitmq.username:alice",
|
||||||
"spring.rabbitmq.password:secret", "spring.rabbitmq.virtual_host:/vhost");
|
"spring.rabbitmq.password:secret", "spring.rabbitmq.virtual_host:/vhost");
|
||||||
|
|
@ -100,7 +104,7 @@ public class RabbitAutoConfigurationTests {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRabbitTemplateEmptyVirtualHost() {
|
public void testConnectionFactoryEmptyVirtualHost() {
|
||||||
load(TestConfiguration.class, "spring.rabbitmq.virtual_host:");
|
load(TestConfiguration.class, "spring.rabbitmq.virtual_host:");
|
||||||
CachingConnectionFactory connectionFactory = this.context
|
CachingConnectionFactory connectionFactory = this.context
|
||||||
.getBean(CachingConnectionFactory.class);
|
.getBean(CachingConnectionFactory.class);
|
||||||
|
|
@ -108,7 +112,7 @@ public class RabbitAutoConfigurationTests {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRabbitTemplateVirtualHostNoLeadingSlash() {
|
public void testConnectionFactoryVirtualHostNoLeadingSlash() {
|
||||||
load(TestConfiguration.class, "spring.rabbitmq.virtual_host:foo");
|
load(TestConfiguration.class, "spring.rabbitmq.virtual_host:foo");
|
||||||
CachingConnectionFactory connectionFactory = this.context
|
CachingConnectionFactory connectionFactory = this.context
|
||||||
.getBean(CachingConnectionFactory.class);
|
.getBean(CachingConnectionFactory.class);
|
||||||
|
|
@ -116,7 +120,7 @@ public class RabbitAutoConfigurationTests {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRabbitTemplateVirtualHostMultiLeadingSlashes() {
|
public void testConnectionFactoryVirtualHostMultiLeadingSlashes() {
|
||||||
load(TestConfiguration.class, "spring.rabbitmq.virtual_host:///foo");
|
load(TestConfiguration.class, "spring.rabbitmq.virtual_host:///foo");
|
||||||
CachingConnectionFactory connectionFactory = this.context
|
CachingConnectionFactory connectionFactory = this.context
|
||||||
.getBean(CachingConnectionFactory.class);
|
.getBean(CachingConnectionFactory.class);
|
||||||
|
|
@ -124,7 +128,7 @@ public class RabbitAutoConfigurationTests {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRabbitTemplateDefaultVirtualHost() {
|
public void testConnectionFactoryDefaultVirtualHost() {
|
||||||
load(TestConfiguration.class, "spring.rabbitmq.virtual_host:/");
|
load(TestConfiguration.class, "spring.rabbitmq.virtual_host:/");
|
||||||
CachingConnectionFactory connectionFactory = this.context
|
CachingConnectionFactory connectionFactory = this.context
|
||||||
.getBean(CachingConnectionFactory.class);
|
.getBean(CachingConnectionFactory.class);
|
||||||
|
|
@ -137,6 +141,32 @@ public class RabbitAutoConfigurationTests {
|
||||||
RabbitTemplate rabbitTemplate = this.context.getBean(RabbitTemplate.class);
|
RabbitTemplate rabbitTemplate = this.context.getBean(RabbitTemplate.class);
|
||||||
assertThat(rabbitTemplate.getMessageConverter())
|
assertThat(rabbitTemplate.getMessageConverter())
|
||||||
.isSameAs(this.context.getBean("myMessageConverter"));
|
.isSameAs(this.context.getBean("myMessageConverter"));
|
||||||
|
DirectFieldAccessor dfa = new DirectFieldAccessor(rabbitTemplate);
|
||||||
|
assertThat(dfa.getPropertyValue("retryTemplate")).isNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRabbitTemplateRetry() {
|
||||||
|
load(TestConfiguration.class, "spring.rabbitmq.template.retry.enable:true",
|
||||||
|
"spring.rabbitmq.template.retry.max-attempts:4",
|
||||||
|
"spring.rabbitmq.template.retry.initial-interval:2000",
|
||||||
|
"spring.rabbitmq.template.retry.multiplier:1.5",
|
||||||
|
"spring.rabbitmq.template.retry.max-interval:5000",
|
||||||
|
"spring.rabbitmq.template.receiveTimeout:123",
|
||||||
|
"spring.rabbitmq.template.replyTimeout:456");
|
||||||
|
RabbitTemplate rabbitTemplate = this.context.getBean(RabbitTemplate.class);
|
||||||
|
DirectFieldAccessor dfa = new DirectFieldAccessor(rabbitTemplate);
|
||||||
|
assertThat(dfa.getPropertyValue("receiveTimeout")).isEqualTo(123L);
|
||||||
|
assertThat(dfa.getPropertyValue("replyTimeout")).isEqualTo(456L);
|
||||||
|
RetryTemplate retryTemplate = (RetryTemplate) dfa.getPropertyValue("retryTemplate");
|
||||||
|
assertThat(retryTemplate).isNotNull();
|
||||||
|
dfa = new DirectFieldAccessor(retryTemplate);
|
||||||
|
SimpleRetryPolicy retryPolicy = (SimpleRetryPolicy) dfa.getPropertyValue("retryPolicy");
|
||||||
|
ExponentialBackOffPolicy backOffPolicy = (ExponentialBackOffPolicy) dfa.getPropertyValue("backOffPolicy");
|
||||||
|
assertThat(retryPolicy.getMaxAttempts()).isEqualTo(4);
|
||||||
|
assertThat(backOffPolicy.getInitialInterval()).isEqualTo(2000);
|
||||||
|
assertThat(backOffPolicy.getMultiplier()).isEqualTo(1.5);
|
||||||
|
assertThat(backOffPolicy.getMaxInterval()).isEqualTo(5000);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -210,16 +240,25 @@ public class RabbitAutoConfigurationTests {
|
||||||
SimpleRabbitListenerContainerFactory.class);
|
SimpleRabbitListenerContainerFactory.class);
|
||||||
rabbitListenerContainerFactory.setTxSize(10);
|
rabbitListenerContainerFactory.setTxSize(10);
|
||||||
verify(rabbitListenerContainerFactory).setTxSize(10);
|
verify(rabbitListenerContainerFactory).setTxSize(10);
|
||||||
|
DirectFieldAccessor dfa = new DirectFieldAccessor(rabbitListenerContainerFactory);
|
||||||
|
Advice[] adviceChain = (Advice[]) dfa.getPropertyValue("adviceChain");
|
||||||
|
assertThat(adviceChain).isNull();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRabbitListenerContainerFactoryWithCustomSettings() {
|
public void testRabbitListenerContainerFactoryWithCustomSettings() {
|
||||||
load(MessageConvertersConfiguration.class,
|
load(MessageConvertersConfiguration.class,
|
||||||
|
"spring.rabbitmq.listener.retry.enable:true",
|
||||||
|
"spring.rabbitmq.listener.retry.max-attempts:4",
|
||||||
|
"spring.rabbitmq.listener.retry.initial-interval:2000",
|
||||||
|
"spring.rabbitmq.listener.retry.multiplier:1.5",
|
||||||
|
"spring.rabbitmq.listener.retry.max-interval:5000",
|
||||||
"spring.rabbitmq.listener.autoStartup:false",
|
"spring.rabbitmq.listener.autoStartup:false",
|
||||||
"spring.rabbitmq.listener.acknowledgeMode:manual",
|
"spring.rabbitmq.listener.acknowledgeMode:manual",
|
||||||
"spring.rabbitmq.listener.concurrency:5",
|
"spring.rabbitmq.listener.concurrency:5",
|
||||||
"spring.rabbitmq.listener.maxConcurrency:10",
|
"spring.rabbitmq.listener.maxConcurrency:10",
|
||||||
"spring.rabbitmq.listener.prefetch=40",
|
"spring.rabbitmq.listener.prefetch:40",
|
||||||
|
"spring.rabbitmq.listener.default-requeue-rejected:false",
|
||||||
"spring.rabbitmq.listener.transactionSize:20");
|
"spring.rabbitmq.listener.transactionSize:20");
|
||||||
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = this.context
|
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = this.context
|
||||||
.getBean("rabbitListenerContainerFactory",
|
.getBean("rabbitListenerContainerFactory",
|
||||||
|
|
@ -234,6 +273,20 @@ public class RabbitAutoConfigurationTests {
|
||||||
assertThat(dfa.getPropertyValue("txSize")).isEqualTo(20);
|
assertThat(dfa.getPropertyValue("txSize")).isEqualTo(20);
|
||||||
assertThat(dfa.getPropertyValue("messageConverter"))
|
assertThat(dfa.getPropertyValue("messageConverter"))
|
||||||
.isSameAs(this.context.getBean("myMessageConverter"));
|
.isSameAs(this.context.getBean("myMessageConverter"));
|
||||||
|
assertThat(dfa.getPropertyValue("defaultRequeueRejected")).isEqualTo(Boolean.FALSE);
|
||||||
|
Advice[] adviceChain = (Advice[]) dfa.getPropertyValue("adviceChain");
|
||||||
|
assertThat(adviceChain).isNotNull();
|
||||||
|
assertThat(adviceChain.length).isEqualTo(1);
|
||||||
|
dfa = new DirectFieldAccessor(adviceChain[0]);
|
||||||
|
RetryTemplate retryTemplate = (RetryTemplate) dfa.getPropertyValue("retryOperations");
|
||||||
|
assertThat(retryTemplate).isNotNull();
|
||||||
|
dfa = new DirectFieldAccessor(retryTemplate);
|
||||||
|
SimpleRetryPolicy retryPolicy = (SimpleRetryPolicy) dfa.getPropertyValue("retryPolicy");
|
||||||
|
ExponentialBackOffPolicy backOffPolicy = (ExponentialBackOffPolicy) dfa.getPropertyValue("backOffPolicy");
|
||||||
|
assertThat(retryPolicy.getMaxAttempts()).isEqualTo(4);
|
||||||
|
assertThat(backOffPolicy.getInitialInterval()).isEqualTo(2000);
|
||||||
|
assertThat(backOffPolicy.getMultiplier()).isEqualTo(1.5);
|
||||||
|
assertThat(backOffPolicy.getMaxInterval()).isEqualTo(5000);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
||||||
|
|
@ -768,8 +768,15 @@ content into your application; rather pick only the properties that you need.
|
||||||
spring.rabbitmq.listener.acknowledge-mode= # Acknowledge mode of container.
|
spring.rabbitmq.listener.acknowledge-mode= # Acknowledge mode of container.
|
||||||
spring.rabbitmq.listener.auto-startup=true # Start the container automatically on startup.
|
spring.rabbitmq.listener.auto-startup=true # Start the container automatically on startup.
|
||||||
spring.rabbitmq.listener.concurrency= # Minimum number of consumers.
|
spring.rabbitmq.listener.concurrency= # Minimum number of consumers.
|
||||||
|
spring.rabbitmq.listener.default-requeue-rejected= # Whether or not to requeue delivery failures; default `true`.
|
||||||
spring.rabbitmq.listener.max-concurrency= # Maximum number of consumers.
|
spring.rabbitmq.listener.max-concurrency= # Maximum number of consumers.
|
||||||
spring.rabbitmq.listener.prefetch= # Number of messages to be handled in a single request. It should be greater than or equal to the transaction size (if used).
|
spring.rabbitmq.listener.prefetch= # Number of messages to be handled in a single request. It should be greater than or equal to the transaction size (if used).
|
||||||
|
spring.rabbitmq.listener.retry.enable= # Set to true to enable stateless retries for listener containers.
|
||||||
|
spring.rabbitmq.listener.retry.initial-interval=1000 # The interval between the first and second attempt to deliver a message.
|
||||||
|
spring.rabbitmq.listener.retry.max-attempts=3 # The maximum number of attempts to deliver a message.
|
||||||
|
spring.rabbitmq.listener.retry.max-interval=10000 # The maximum number of attempts to deliver a message.
|
||||||
|
spring.rabbitmq.listener.retry.multiplier=1.0 # A multiplier to apply to the previous delivery retry interval.
|
||||||
|
spring.rabbitmq.listener.retry.stateless=true # Whether or not retry is stateless or stateful.
|
||||||
spring.rabbitmq.listener.transaction-size= # Number of messages to be processed in a transaction. For best results it should be less than or equal to the prefetch count.
|
spring.rabbitmq.listener.transaction-size= # Number of messages to be processed in a transaction. For best results it should be less than or equal to the prefetch count.
|
||||||
spring.rabbitmq.password= # Login to authenticate against the broker.
|
spring.rabbitmq.password= # Login to authenticate against the broker.
|
||||||
spring.rabbitmq.port=5672 # RabbitMQ port.
|
spring.rabbitmq.port=5672 # RabbitMQ port.
|
||||||
|
|
@ -779,6 +786,13 @@ content into your application; rather pick only the properties that you need.
|
||||||
spring.rabbitmq.ssl.key-store-password= # Password used to access the key store.
|
spring.rabbitmq.ssl.key-store-password= # Password used to access the key store.
|
||||||
spring.rabbitmq.ssl.trust-store= # Trust store that holds SSL certificates.
|
spring.rabbitmq.ssl.trust-store= # Trust store that holds SSL certificates.
|
||||||
spring.rabbitmq.ssl.trust-store-password= # Password used to access the trust store.
|
spring.rabbitmq.ssl.trust-store-password= # Password used to access the trust store.
|
||||||
|
spring.rabbitmq.template.receiveTimeout=0 # Timeout for `receive()` methods.
|
||||||
|
spring.rabbitmq.template.replyTimeout=5000 # Timeout for `sendAndReceive()` methods.
|
||||||
|
spring.rabbitmq.template.retry.enable= # Set to true to enable retries in the `RabbitTemplate`.
|
||||||
|
spring.rabbitmq.template.retry.initial-interval=1000 # The interval between the first and second attempt to publish a message.
|
||||||
|
spring.rabbitmq.template.retry.max-attempts=3 # The maximum number of attempts to publish a message.
|
||||||
|
spring.rabbitmq.template.retry.max-interval=10000 # The maximum number of attempts to publish a message.
|
||||||
|
spring.rabbitmq.template.retry.multiplier=1.0 # A multiplier to apply to the previous publishing retry interval.
|
||||||
spring.rabbitmq.username= # Login user to authenticate to the broker.
|
spring.rabbitmq.username= # Login user to authenticate to the broker.
|
||||||
spring.rabbitmq.virtual-host= # Virtual host to use when connecting to the broker.
|
spring.rabbitmq.virtual-host= # Virtual host to use when connecting to the broker.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -3801,7 +3801,9 @@ automatically to the auto-configured `AmqpTemplate`.
|
||||||
Any `org.springframework.amqp.core.Queue` that is defined as a bean will be automatically
|
Any `org.springframework.amqp.core.Queue` that is defined as a bean will be automatically
|
||||||
used to declare a corresponding queue on the RabbitMQ instance if necessary.
|
used to declare a corresponding queue on the RabbitMQ instance if necessary.
|
||||||
|
|
||||||
|
You can enable retries on the `AmqpTemplate` to retry operations, for example in the event the broker connection is
|
||||||
|
lost.
|
||||||
|
Retries are disabled by default.
|
||||||
|
|
||||||
[[boot-features-using-amqp-receiving]]
|
[[boot-features-using-amqp-receiving]]
|
||||||
==== Receiving a message
|
==== Receiving a message
|
||||||
|
|
@ -3868,6 +3870,16 @@ That you can use in any `@RabbitListener`-annotated method as follows:
|
||||||
}
|
}
|
||||||
----
|
----
|
||||||
|
|
||||||
|
You can enable retries to handle situations where your listener throws an exception.
|
||||||
|
When retries are exhausted, the message will be rejected and either dropped or routed to a dead-letter exchange
|
||||||
|
if the broker is so configured.
|
||||||
|
Retries are disabled by default.
|
||||||
|
|
||||||
|
IMPORTANT: If retries are not enabled and the listener throws an exception, by default the delivery will be retried
|
||||||
|
indefinitely.
|
||||||
|
You can modify this behavior in two ways; set the `defaultRequeueRejected` property to `false` and zero redeliveries
|
||||||
|
will be attempted; or, throw an `AmqpRejectAndDontRequeueException` to signal the message should be rejected.
|
||||||
|
This is the mechanism used when retries are enabled and the maximum delivery attempts is reached.
|
||||||
|
|
||||||
[[boot-features-email]]
|
[[boot-features-email]]
|
||||||
== Sending email
|
== Sending email
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue