Customize default `RabbitListenerFactory`
Expose acknowledgment mode, concurrency, prefecth and transaction size settings in configuration for the default `RabbitListenerContainerFactory`. Closes gh-3479
This commit is contained in:
parent
22a7b0cdee
commit
1683d8a8f3
|
@ -20,6 +20,7 @@ import org.springframework.amqp.rabbit.annotation.EnableRabbit;
|
|||
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
|
||||
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.Listener;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
|
@ -41,6 +42,22 @@ class RabbitAnnotationDrivenConfiguration {
|
|||
ConnectionFactory connectionFactory, RabbitProperties config) {
|
||||
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
||||
factory.setConnectionFactory(connectionFactory);
|
||||
Listener listenerConfig = config.getListener();
|
||||
if (listenerConfig.getAckMode() != null) {
|
||||
factory.setAcknowledgeMode(listenerConfig.getAckMode());
|
||||
}
|
||||
if (listenerConfig.getConcurrency() != null) {
|
||||
factory.setConcurrentConsumers(listenerConfig.getConcurrency());
|
||||
}
|
||||
if (listenerConfig.getMaxConcurrency() != null) {
|
||||
factory.setMaxConcurrentConsumers(listenerConfig.getMaxConcurrency());
|
||||
}
|
||||
if (listenerConfig.getPrefetch() != null) {
|
||||
factory.setPrefetchCount(listenerConfig.getPrefetch());
|
||||
}
|
||||
if (listenerConfig.getTxSize() != null) {
|
||||
factory.setTxSize(listenerConfig.getTxSize());
|
||||
}
|
||||
return factory;
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.util.LinkedHashSet;
|
|||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import org.springframework.amqp.core.AcknowledgeMode;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
|
@ -74,6 +75,11 @@ public class RabbitProperties {
|
|||
*/
|
||||
private Integer requestedHeartbeat;
|
||||
|
||||
/**
|
||||
* Listener container configuration.
|
||||
*/
|
||||
private final Listener listener = new Listener();
|
||||
|
||||
public String getHost() {
|
||||
if (this.addresses == null) {
|
||||
return this.host;
|
||||
|
@ -180,6 +186,10 @@ public class RabbitProperties {
|
|||
this.requestedHeartbeat = requestedHeartbeat;
|
||||
}
|
||||
|
||||
public Listener getListener() {
|
||||
return listener;
|
||||
}
|
||||
|
||||
public static class Ssl {
|
||||
|
||||
/**
|
||||
|
@ -271,4 +281,73 @@ public class RabbitProperties {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Listener {
|
||||
|
||||
/**
|
||||
* Acknowledge mode of container.
|
||||
*/
|
||||
private AcknowledgeMode ackMode;
|
||||
|
||||
/**
|
||||
* Minimum number of consumers.
|
||||
*/
|
||||
private Integer concurrency;
|
||||
|
||||
/**
|
||||
* Maximum number of consumers.
|
||||
*/
|
||||
private Integer maxConcurrency;
|
||||
|
||||
/**
|
||||
* Message prefetch count.
|
||||
*/
|
||||
private Integer prefetch;
|
||||
|
||||
/**
|
||||
* Number of messages in a transaction.
|
||||
*/
|
||||
private Integer txSize;
|
||||
|
||||
public AcknowledgeMode getAckMode() {
|
||||
return ackMode;
|
||||
}
|
||||
|
||||
public void setAckMode(AcknowledgeMode ackMode) {
|
||||
this.ackMode = ackMode;
|
||||
}
|
||||
|
||||
public Integer getConcurrency() {
|
||||
return concurrency;
|
||||
}
|
||||
|
||||
public void setConcurrency(Integer concurrency) {
|
||||
this.concurrency = concurrency;
|
||||
}
|
||||
|
||||
public Integer getMaxConcurrency() {
|
||||
return maxConcurrency;
|
||||
}
|
||||
|
||||
public void setMaxConcurrency(Integer maxConcurrency) {
|
||||
this.maxConcurrency = maxConcurrency;
|
||||
}
|
||||
|
||||
public Integer getPrefetch() {
|
||||
return prefetch;
|
||||
}
|
||||
|
||||
public void setPrefetch(Integer prefetch) {
|
||||
this.prefetch = prefetch;
|
||||
}
|
||||
|
||||
public Integer getTxSize() {
|
||||
return txSize;
|
||||
}
|
||||
|
||||
public void setTxSize(Integer txSize) {
|
||||
this.txSize = txSize;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -23,6 +23,8 @@ import org.junit.After;
|
|||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import org.springframework.amqp.core.AcknowledgeMode;
|
||||
import org.springframework.amqp.core.AmqpAdmin;
|
||||
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
|
||||
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
|
||||
|
@ -228,6 +230,28 @@ public class RabbitAutoConfigurationTests {
|
|||
"spring.rabbitmq.ssl.trustStorePassword=secret");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRabbitListenerContainerFactoryOverrides() {
|
||||
load(TestConfiguration.class, "spring.rabbitmq.listener.prefetch:20",
|
||||
"spring.rabbitmq.listener.ackMode:MANUAL",
|
||||
"spring.rabbitmq.listener.concurrency:10",
|
||||
"spring.rabbitmq.listener.maxConcurrency:10",
|
||||
"spring.rabbitmq.listener.txSize:20");
|
||||
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = this.context
|
||||
.getBean("rabbitListenerContainerFactory",
|
||||
SimpleRabbitListenerContainerFactory.class);
|
||||
assertEquals(new Integer(20), (Integer)
|
||||
new DirectFieldAccessor(rabbitListenerContainerFactory).getPropertyValue("prefetchCount"));
|
||||
assertEquals(AcknowledgeMode.MANUAL, (AcknowledgeMode)
|
||||
new DirectFieldAccessor(rabbitListenerContainerFactory).getPropertyValue("acknowledgeMode"));
|
||||
assertEquals(new Integer(10), (Integer)
|
||||
new DirectFieldAccessor(rabbitListenerContainerFactory).getPropertyValue("concurrentConsumers"));
|
||||
assertEquals(new Integer(10), (Integer)
|
||||
new DirectFieldAccessor(rabbitListenerContainerFactory).getPropertyValue("maxConcurrentConsumers"));
|
||||
assertEquals(new Integer(20), (Integer)
|
||||
new DirectFieldAccessor(rabbitListenerContainerFactory).getPropertyValue("txSize"));
|
||||
}
|
||||
|
||||
private com.rabbitmq.client.ConnectionFactory getTargetConnectionFactory() {
|
||||
CachingConnectionFactory connectionFactory = this.context
|
||||
.getBean(CachingConnectionFactory.class);
|
||||
|
|
Loading…
Reference in New Issue