Closes gh-3475
This commit is contained in:
Stephane Nicoll 2015-07-16 15:22:38 +02:00
parent 1683d8a8f3
commit aa483984c5
4 changed files with 49 additions and 42 deletions

View File

@ -30,6 +30,7 @@ import org.springframework.context.annotation.Configuration;
* Configuration for Spring AMQP annotation driven endpoints. * Configuration for Spring AMQP annotation driven endpoints.
* *
* @author Stephane Nicoll * @author Stephane Nicoll
* @author Josh Thornhill
* @since 1.2.0 * @since 1.2.0
*/ */
@Configuration @Configuration
@ -43,8 +44,8 @@ class RabbitAnnotationDrivenConfiguration {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory); factory.setConnectionFactory(connectionFactory);
Listener listenerConfig = config.getListener(); Listener listenerConfig = config.getListener();
if (listenerConfig.getAckMode() != null) { if (listenerConfig.getAcknowledgeMode() != null) {
factory.setAcknowledgeMode(listenerConfig.getAckMode()); factory.setAcknowledgeMode(listenerConfig.getAcknowledgeMode());
} }
if (listenerConfig.getConcurrency() != null) { if (listenerConfig.getConcurrency() != null) {
factory.setConcurrentConsumers(listenerConfig.getConcurrency()); factory.setConcurrentConsumers(listenerConfig.getConcurrency());
@ -55,8 +56,8 @@ class RabbitAnnotationDrivenConfiguration {
if (listenerConfig.getPrefetch() != null) { if (listenerConfig.getPrefetch() != null) {
factory.setPrefetchCount(listenerConfig.getPrefetch()); factory.setPrefetchCount(listenerConfig.getPrefetch());
} }
if (listenerConfig.getTxSize() != null) { if (listenerConfig.getTransactionSize() != null) {
factory.setTxSize(listenerConfig.getTxSize()); factory.setTxSize(listenerConfig.getTransactionSize());
} }
return factory; return factory;
} }

View File

@ -31,6 +31,7 @@ import org.springframework.util.StringUtils;
* @author Dave Syer * @author Dave Syer
* @author Stephane Nicoll * @author Stephane Nicoll
* @author Andy Wilkinson * @author Andy Wilkinson
* @author Josh Thornhill
*/ */
@ConfigurationProperties(prefix = "spring.rabbitmq") @ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties { public class RabbitProperties {
@ -187,7 +188,7 @@ public class RabbitProperties {
} }
public Listener getListener() { public Listener getListener() {
return listener; return this.listener;
} }
public static class Ssl { public static class Ssl {
@ -287,7 +288,7 @@ public class RabbitProperties {
/** /**
* Acknowledge mode of container. * Acknowledge mode of container.
*/ */
private AcknowledgeMode ackMode; private AcknowledgeMode acknowledgeMode;
/** /**
* Minimum number of consumers. * Minimum number of consumers.
@ -300,25 +301,27 @@ public class RabbitProperties {
private Integer maxConcurrency; private Integer maxConcurrency;
/** /**
* Message prefetch count. * Number of messages to be handled in a single request. It should be greater than
* or equal to the transaction size (if used).
*/ */
private Integer prefetch; private Integer prefetch;
/** /**
* Number of messages in a transaction. * Number of messages to be processed in a transaction. For best results it should
* be less than or equal to the prefetch count.
*/ */
private Integer txSize; private Integer transactionSize;
public AcknowledgeMode getAckMode() { public AcknowledgeMode getAcknowledgeMode() {
return ackMode; return this.acknowledgeMode;
} }
public void setAckMode(AcknowledgeMode ackMode) { public void setAcknowledgeMode(AcknowledgeMode acknowledgeMode) {
this.ackMode = ackMode; this.acknowledgeMode = acknowledgeMode;
} }
public Integer getConcurrency() { public Integer getConcurrency() {
return concurrency; return this.concurrency;
} }
public void setConcurrency(Integer concurrency) { public void setConcurrency(Integer concurrency) {
@ -326,7 +329,7 @@ public class RabbitProperties {
} }
public Integer getMaxConcurrency() { public Integer getMaxConcurrency() {
return maxConcurrency; return this.maxConcurrency;
} }
public void setMaxConcurrency(Integer maxConcurrency) { public void setMaxConcurrency(Integer maxConcurrency) {
@ -334,19 +337,19 @@ public class RabbitProperties {
} }
public Integer getPrefetch() { public Integer getPrefetch() {
return prefetch; return this.prefetch;
} }
public void setPrefetch(Integer prefetch) { public void setPrefetch(Integer prefetch) {
this.prefetch = prefetch; this.prefetch = prefetch;
} }
public Integer getTxSize() { public Integer getTransactionSize() {
return txSize; return this.transactionSize;
} }
public void setTxSize(Integer txSize) { public void setTransactionSize(Integer transactionSize) {
this.txSize = txSize; this.transactionSize = transactionSize;
} }
} }

View File

@ -187,6 +187,26 @@ public class RabbitAutoConfigurationTests {
verify(rabbitListenerContainerFactory).setTxSize(10); verify(rabbitListenerContainerFactory).setTxSize(10);
} }
@Test
public void testRabbitListenerContainerFactoryWithCustomSettings() {
load(TestConfiguration.class,
"spring.rabbitmq.listener.acknowledgeMode:manual",
"spring.rabbitmq.listener.concurrency:5",
"spring.rabbitmq.listener.maxConcurrency:10",
"spring.rabbitmq.listener.prefetch=40",
"spring.rabbitmq.listener.transactionSize:20");
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = this.context
.getBean("rabbitListenerContainerFactory",
SimpleRabbitListenerContainerFactory.class);
DirectFieldAccessor dfa = new DirectFieldAccessor(rabbitListenerContainerFactory);
assertEquals(AcknowledgeMode.MANUAL,
dfa.getPropertyValue("acknowledgeMode"));
assertEquals(5, dfa.getPropertyValue("concurrentConsumers"));
assertEquals(10, dfa.getPropertyValue("maxConcurrentConsumers"));
assertEquals(40, dfa.getPropertyValue("prefetchCount"));
assertEquals(20, dfa.getPropertyValue("txSize"));
}
@Test @Test
public void enableRabbitAutomatically() throws Exception { public void enableRabbitAutomatically() throws Exception {
load(NoEnableRabbitConfiguration.class); load(NoEnableRabbitConfiguration.class);
@ -230,28 +250,6 @@ public class RabbitAutoConfigurationTests {
"spring.rabbitmq.ssl.trustStorePassword=secret"); "spring.rabbitmq.ssl.trustStorePassword=secret");
} }
@Test
public void testRabbitListenerContainerFactoryOverrides() {
load(TestConfiguration.class, "spring.rabbitmq.listener.prefetch:20",
"spring.rabbitmq.listener.ackMode:MANUAL",
"spring.rabbitmq.listener.concurrency:10",
"spring.rabbitmq.listener.maxConcurrency:10",
"spring.rabbitmq.listener.txSize:20");
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = this.context
.getBean("rabbitListenerContainerFactory",
SimpleRabbitListenerContainerFactory.class);
assertEquals(new Integer(20), (Integer)
new DirectFieldAccessor(rabbitListenerContainerFactory).getPropertyValue("prefetchCount"));
assertEquals(AcknowledgeMode.MANUAL, (AcknowledgeMode)
new DirectFieldAccessor(rabbitListenerContainerFactory).getPropertyValue("acknowledgeMode"));
assertEquals(new Integer(10), (Integer)
new DirectFieldAccessor(rabbitListenerContainerFactory).getPropertyValue("concurrentConsumers"));
assertEquals(new Integer(10), (Integer)
new DirectFieldAccessor(rabbitListenerContainerFactory).getPropertyValue("maxConcurrentConsumers"));
assertEquals(new Integer(20), (Integer)
new DirectFieldAccessor(rabbitListenerContainerFactory).getPropertyValue("txSize"));
}
private com.rabbitmq.client.ConnectionFactory getTargetConnectionFactory() { private com.rabbitmq.client.ConnectionFactory getTargetConnectionFactory() {
CachingConnectionFactory connectionFactory = this.context CachingConnectionFactory connectionFactory = this.context
.getBean(CachingConnectionFactory.class); .getBean(CachingConnectionFactory.class);

View File

@ -469,6 +469,11 @@ content into your application; rather pick only the properties that you need.
spring.rabbitmq.port= # connection port spring.rabbitmq.port= # connection port
spring.rabbitmq.password= # login password spring.rabbitmq.password= # login password
spring.rabbitmq.requested-heartbeat= # requested heartbeat timeout, in seconds; zero for none spring.rabbitmq.requested-heartbeat= # requested heartbeat timeout, in seconds; zero for none
spring.rabbitmq.listener.acknowledge-mode= # acknowledge mode of container
spring.rabbitmq.listener.concurrency= # minimum number of consumers
spring.rabbitmq.listener.max-concurrency= # maximum number of consumers
spring.rabbitmq.listener.prefetch= # number of messages to be handled in a single request
spring.rabbitmq.listener.transaction-size= # number of messages to be processed in a transaction
spring.rabbitmq.ssl.enabled=false # enable SSL support spring.rabbitmq.ssl.enabled=false # enable SSL support
spring.rabbitmq.ssl.key-store= # path to the key store that holds the SSL certificate spring.rabbitmq.ssl.key-store= # path to the key store that holds the SSL certificate
spring.rabbitmq.ssl.key-store-password= # password used to access the key store spring.rabbitmq.ssl.key-store-password= # password used to access the key store