Allow to customize the RabbitMQ RetryTemplate
This commit adds the ability to customize the RetryTemplate used in the RabbitMQ infrastructure. The customizer is slightly unusual and offer a `Target` enum that define the component that will use the retry template: `SENDER` for the auto-configured `RabbitTemplate` and `LISTENER` for a listener container created by a `RabbitListenerContainerFactoryConfigurer`. Closes gh-13793
This commit is contained in:
parent
58efd1b51a
commit
ada699a9f6
|
@ -16,6 +16,8 @@
|
|||
|
||||
package org.springframework.boot.autoconfigure.amqp;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory;
|
||||
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
|
@ -40,6 +42,8 @@ public abstract class AbstractRabbitListenerContainerFactoryConfigurer<T extends
|
|||
|
||||
private MessageRecoverer messageRecoverer;
|
||||
|
||||
private List<RabbitRetryTemplateCustomizer> retryTemplateCustomizers;
|
||||
|
||||
private RabbitProperties rabbitProperties;
|
||||
|
||||
/**
|
||||
|
@ -59,6 +63,15 @@ public abstract class AbstractRabbitListenerContainerFactoryConfigurer<T extends
|
|||
this.messageRecoverer = messageRecoverer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link RabbitRetryTemplateCustomizer} instances to use.
|
||||
* @param retryTemplateCustomizers the retry template customizers
|
||||
*/
|
||||
protected void setRetryTemplateCustomizers(
|
||||
List<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) {
|
||||
this.retryTemplateCustomizers = retryTemplateCustomizers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link RabbitProperties} to use.
|
||||
* @param rabbitProperties the {@link RabbitProperties}
|
||||
|
@ -108,7 +121,9 @@ public abstract class AbstractRabbitListenerContainerFactoryConfigurer<T extends
|
|||
? RetryInterceptorBuilder.stateless()
|
||||
: RetryInterceptorBuilder.stateful());
|
||||
builder.retryOperations(
|
||||
new RetryTemplateFactory().createRetryTemplate(retryConfig));
|
||||
new RetryTemplateFactory(this.retryTemplateCustomizers)
|
||||
.createRetryTemplate(retryConfig,
|
||||
RabbitRetryTemplateCustomizer.Target.LISTENER));
|
||||
MessageRecoverer recoverer = (this.messageRecoverer != null
|
||||
? this.messageRecoverer : new RejectAndDontRequeueRecoverer());
|
||||
builder.recoverer(recoverer);
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
|
||||
package org.springframework.boot.autoconfigure.amqp;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
|
||||
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
|
||||
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
|
||||
|
@ -45,13 +47,17 @@ class RabbitAnnotationDrivenConfiguration {
|
|||
|
||||
private final ObjectProvider<MessageRecoverer> messageRecoverer;
|
||||
|
||||
private final ObjectProvider<List<RabbitRetryTemplateCustomizer>> retryTemplateCustomizers;
|
||||
|
||||
private final RabbitProperties properties;
|
||||
|
||||
RabbitAnnotationDrivenConfiguration(ObjectProvider<MessageConverter> messageConverter,
|
||||
ObjectProvider<MessageRecoverer> messageRecoverer,
|
||||
ObjectProvider<List<RabbitRetryTemplateCustomizer>> retryTemplateCustomizers,
|
||||
RabbitProperties properties) {
|
||||
this.messageConverter = messageConverter;
|
||||
this.messageRecoverer = messageRecoverer;
|
||||
this.retryTemplateCustomizers = retryTemplateCustomizers;
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
|
@ -61,6 +67,8 @@ class RabbitAnnotationDrivenConfiguration {
|
|||
SimpleRabbitListenerContainerFactoryConfigurer configurer = new SimpleRabbitListenerContainerFactoryConfigurer();
|
||||
configurer.setMessageConverter(this.messageConverter.getIfUnique());
|
||||
configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique());
|
||||
configurer.setRetryTemplateCustomizers(
|
||||
this.retryTemplateCustomizers.getIfAvailable());
|
||||
configurer.setRabbitProperties(this.properties);
|
||||
return configurer;
|
||||
}
|
||||
|
@ -82,6 +90,8 @@ class RabbitAnnotationDrivenConfiguration {
|
|||
DirectRabbitListenerContainerFactoryConfigurer configurer = new DirectRabbitListenerContainerFactoryConfigurer();
|
||||
configurer.setMessageConverter(this.messageConverter.getIfUnique());
|
||||
configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique());
|
||||
configurer.setRetryTemplateCustomizers(
|
||||
this.retryTemplateCustomizers.getIfAvailable());
|
||||
configurer.setRabbitProperties(this.properties);
|
||||
return configurer;
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.springframework.boot.autoconfigure.amqp;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
|
||||
|
@ -151,15 +152,18 @@ public class RabbitAutoConfiguration {
|
|||
@Import(RabbitConnectionFactoryCreator.class)
|
||||
protected static class RabbitTemplateConfiguration {
|
||||
|
||||
private final ObjectProvider<MessageConverter> messageConverter;
|
||||
|
||||
private final RabbitProperties properties;
|
||||
|
||||
public RabbitTemplateConfiguration(
|
||||
private final ObjectProvider<MessageConverter> messageConverter;
|
||||
|
||||
private final ObjectProvider<List<RabbitRetryTemplateCustomizer>> retryTemplateCustomizers;
|
||||
|
||||
public RabbitTemplateConfiguration(RabbitProperties properties,
|
||||
ObjectProvider<MessageConverter> messageConverter,
|
||||
RabbitProperties properties) {
|
||||
this.messageConverter = messageConverter;
|
||||
ObjectProvider<List<RabbitRetryTemplateCustomizer>> retryTemplateCustomizers) {
|
||||
this.properties = properties;
|
||||
this.messageConverter = messageConverter;
|
||||
this.retryTemplateCustomizers = retryTemplateCustomizers;
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
@ -175,8 +179,10 @@ public class RabbitAutoConfiguration {
|
|||
template.setMandatory(determineMandatoryFlag());
|
||||
RabbitProperties.Template properties = this.properties.getTemplate();
|
||||
if (properties.getRetry().isEnabled()) {
|
||||
template.setRetryTemplate(new RetryTemplateFactory()
|
||||
.createRetryTemplate(properties.getRetry()));
|
||||
template.setRetryTemplate(new RetryTemplateFactory(
|
||||
this.retryTemplateCustomizers.getIfAvailable())
|
||||
.createRetryTemplate(properties.getRetry(),
|
||||
RabbitRetryTemplateCustomizer.Target.SENDER));
|
||||
}
|
||||
map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis)
|
||||
.to(template::setReceiveTimeout);
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* Copyright 2012-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.autoconfigure.amqp;
|
||||
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
|
||||
/**
|
||||
* Callback interface that can be used to customize a {@link RetryTemplate} used as part
|
||||
* of the Rabbit infrastructure.
|
||||
*
|
||||
* @author Stephane Nicoll
|
||||
* @since 2.1.0
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface RabbitRetryTemplateCustomizer {
|
||||
|
||||
/**
|
||||
* Callback to customize a {@link RetryTemplate} instance used in the context of the
|
||||
* specified {@link Target}.
|
||||
* @param target the {@link Target} of the retry template
|
||||
* @param retryTemplate the template to customize
|
||||
*/
|
||||
void customize(Target target, RetryTemplate retryTemplate);
|
||||
|
||||
/**
|
||||
* Define the available target for a {@link RetryTemplate}.
|
||||
*/
|
||||
enum Target {
|
||||
|
||||
/**
|
||||
* {@link RabbitTemplate} target.
|
||||
*/
|
||||
SENDER,
|
||||
|
||||
/**
|
||||
* {@link AbstractMessageListenerContainer} target.
|
||||
*/
|
||||
LISTENER
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -17,6 +17,7 @@
|
|||
package org.springframework.boot.autoconfigure.amqp;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.boot.context.properties.PropertyMapper;
|
||||
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
|
||||
|
@ -31,7 +32,14 @@ import org.springframework.retry.support.RetryTemplate;
|
|||
*/
|
||||
class RetryTemplateFactory {
|
||||
|
||||
public RetryTemplate createRetryTemplate(RabbitProperties.Retry properties) {
|
||||
private final List<RabbitRetryTemplateCustomizer> customizers;
|
||||
|
||||
RetryTemplateFactory(List<RabbitRetryTemplateCustomizer> customizers) {
|
||||
this.customizers = customizers;
|
||||
}
|
||||
|
||||
public RetryTemplate createRetryTemplate(RabbitProperties.Retry properties,
|
||||
RabbitRetryTemplateCustomizer.Target target) {
|
||||
PropertyMapper map = PropertyMapper.get();
|
||||
RetryTemplate template = new RetryTemplate();
|
||||
SimpleRetryPolicy policy = new SimpleRetryPolicy();
|
||||
|
@ -44,6 +52,11 @@ class RetryTemplateFactory {
|
|||
map.from(properties::getMaxInterval).whenNonNull().as(Duration::toMillis)
|
||||
.to(backOffPolicy::setMaxInterval);
|
||||
template.setBackOffPolicy(backOffPolicy);
|
||||
if (this.customizers != null) {
|
||||
for (RabbitRetryTemplateCustomizer customizer : this.customizers) {
|
||||
customizer.customize(target, template);
|
||||
}
|
||||
}
|
||||
return template;
|
||||
}
|
||||
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.springframework.amqp.core.AcknowledgeMode;
|
|||
import org.springframework.amqp.core.AmqpAdmin;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
|
||||
import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory;
|
||||
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
|
||||
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
|
||||
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||
|
@ -54,8 +55,11 @@ import org.springframework.boot.test.context.runner.ApplicationContextRunner;
|
|||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Primary;
|
||||
import org.springframework.retry.RetryPolicy;
|
||||
import org.springframework.retry.backoff.BackOffPolicy;
|
||||
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
|
||||
import org.springframework.retry.interceptor.MethodInvocationRecoverer;
|
||||
import org.springframework.retry.policy.NeverRetryPolicy;
|
||||
import org.springframework.retry.policy.SimpleRetryPolicy;
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
|
||||
|
@ -280,6 +284,28 @@ public class RabbitAutoConfigurationTests {
|
|||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRabbitTemplateRetryWithCustomizer() {
|
||||
this.contextRunner
|
||||
.withUserConfiguration(RabbitRetryTemplateCustomizerConfiguration.class)
|
||||
.withPropertyValues("spring.rabbitmq.template.retry.enabled:true",
|
||||
"spring.rabbitmq.template.retry.initialInterval:2000")
|
||||
.run((context) -> {
|
||||
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
|
||||
DirectFieldAccessor dfa = new DirectFieldAccessor(rabbitTemplate);
|
||||
RetryTemplate retryTemplate = (RetryTemplate) dfa
|
||||
.getPropertyValue("retryTemplate");
|
||||
assertThat(retryTemplate).isNotNull();
|
||||
dfa = new DirectFieldAccessor(retryTemplate);
|
||||
assertThat(dfa.getPropertyValue("backOffPolicy"))
|
||||
.isSameAs(context.getBean(
|
||||
RabbitRetryTemplateCustomizerConfiguration.class).backOffPolicy);
|
||||
ExponentialBackOffPolicy backOffPolicy = (ExponentialBackOffPolicy) dfa
|
||||
.getPropertyValue("backOffPolicy");
|
||||
assertThat(backOffPolicy.getInitialInterval()).isEqualTo(100);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRabbitTemplateExchangeAndRoutingKey() {
|
||||
this.contextRunner.withUserConfiguration(TestConfiguration.class)
|
||||
|
@ -470,6 +496,53 @@ public class RabbitAutoConfigurationTests {
|
|||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleRabbitListenerContainerFactoryRetryWithCustomizer() {
|
||||
this.contextRunner
|
||||
.withUserConfiguration(RabbitRetryTemplateCustomizerConfiguration.class)
|
||||
.withPropertyValues("spring.rabbitmq.listener.simple.retry.enabled:true",
|
||||
"spring.rabbitmq.listener.simple.retry.maxAttempts:4")
|
||||
.run((context) -> {
|
||||
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context
|
||||
.getBean("rabbitListenerContainerFactory",
|
||||
SimpleRabbitListenerContainerFactory.class);
|
||||
assertListenerRetryTemplate(rabbitListenerContainerFactory,
|
||||
context.getBean(
|
||||
RabbitRetryTemplateCustomizerConfiguration.class).retryPolicy);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDirectRabbitListenerContainerFactoryRetryWithCustomizer() {
|
||||
this.contextRunner
|
||||
.withUserConfiguration(RabbitRetryTemplateCustomizerConfiguration.class)
|
||||
.withPropertyValues("spring.rabbitmq.listener.type:direct",
|
||||
"spring.rabbitmq.listener.direct.retry.enabled:true",
|
||||
"spring.rabbitmq.listener.direct.retry.maxAttempts:4")
|
||||
.run((context) -> {
|
||||
DirectRabbitListenerContainerFactory rabbitListenerContainerFactory = context
|
||||
.getBean("rabbitListenerContainerFactory",
|
||||
DirectRabbitListenerContainerFactory.class);
|
||||
assertListenerRetryTemplate(rabbitListenerContainerFactory,
|
||||
context.getBean(
|
||||
RabbitRetryTemplateCustomizerConfiguration.class).retryPolicy);
|
||||
});
|
||||
}
|
||||
|
||||
private void assertListenerRetryTemplate(
|
||||
AbstractRabbitListenerContainerFactory<?> rabbitListenerContainerFactory,
|
||||
RetryPolicy retryPolicy) {
|
||||
DirectFieldAccessor dfa = new DirectFieldAccessor(rabbitListenerContainerFactory);
|
||||
Advice[] adviceChain = (Advice[]) dfa.getPropertyValue("adviceChain");
|
||||
assertThat(adviceChain).isNotNull();
|
||||
assertThat(adviceChain.length).isEqualTo(1);
|
||||
dfa = new DirectFieldAccessor(adviceChain[0]);
|
||||
RetryTemplate retryTemplate = (RetryTemplate) dfa
|
||||
.getPropertyValue("retryOperations");
|
||||
dfa = new DirectFieldAccessor(retryTemplate);
|
||||
assertThat(dfa.getPropertyValue("retryPolicy")).isSameAs(retryPolicy);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRabbitListenerContainerFactoryConfigurersAreAvailable() {
|
||||
this.contextRunner.withUserConfiguration(TestConfiguration.class)
|
||||
|
@ -793,6 +866,33 @@ public class RabbitAutoConfigurationTests {
|
|||
|
||||
}
|
||||
|
||||
@Configuration
|
||||
protected static class RabbitRetryTemplateCustomizerConfiguration {
|
||||
|
||||
private final BackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
|
||||
|
||||
private final RetryPolicy retryPolicy = new NeverRetryPolicy();
|
||||
|
||||
@Bean
|
||||
public RabbitRetryTemplateCustomizer rabbitTemplateRetryTemplateCustomizer() {
|
||||
return (target, template) -> {
|
||||
if (target.equals(RabbitRetryTemplateCustomizer.Target.SENDER)) {
|
||||
template.setBackOffPolicy(this.backOffPolicy);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Bean
|
||||
public RabbitRetryTemplateCustomizer rabbitListenerRetryTemplateCustomizer() {
|
||||
return (target, template) -> {
|
||||
if (target.equals(RabbitRetryTemplateCustomizer.Target.LISTENER)) {
|
||||
template.setRetryPolicy(this.retryPolicy);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@EnableRabbit
|
||||
protected static class EnableRabbitConfiguration {
|
||||
|
|
|
@ -5362,7 +5362,16 @@ If necessary, any `org.springframework.amqp.core.Queue` that is defined as a bea
|
|||
automatically used to declare a corresponding queue on the RabbitMQ instance.
|
||||
|
||||
To retry operations, you can enable retries on the `AmqpTemplate` (for example, in the
|
||||
event that the broker connection is lost). Retries are disabled by default.
|
||||
event that the broker connection is lost):
|
||||
|
||||
[source,properties,indent=0]
|
||||
----
|
||||
spring.rabbitmq.template.retry.enabled=true
|
||||
spring.rabbitmq.template.retry.initial-interval=2s
|
||||
----
|
||||
|
||||
Retries are disabled by default. You can also customize the `RetryTemplate`
|
||||
programmatically by declaring a `RabbitRetryTemplateCustomizer` bean.
|
||||
|
||||
|
||||
|
||||
|
@ -5444,7 +5453,8 @@ You can enable retries to handle situations where your listener throws an except
|
|||
default, `RejectAndDontRequeueRecoverer` is used, but you can define a `MessageRecoverer`
|
||||
of your own. When retries are exhausted, the message is rejected and either dropped or
|
||||
routed to a dead-letter exchange if the broker is configured to do so. By default,
|
||||
retries are disabled.
|
||||
retries are disabled. You can also customize the `RetryTemplate` programmatically by
|
||||
declaring a `RabbitRetryTemplateCustomizer` bean.
|
||||
|
||||
IMPORTANT: By default, if retries are disabled and the listener throws an exception, the
|
||||
delivery is retried indefinitely. You can modify this behavior in two ways: Set the
|
||||
|
|
Loading…
Reference in New Issue