Merge pull request #3479 from joshthornhill/gh-3475
* pr/3479:
Polish 1683d8a8
Customize default `RabbitListenerFactory`
This commit is contained in:
commit
812f8c8110
|
|
@ -20,6 +20,7 @@ import org.springframework.amqp.rabbit.annotation.EnableRabbit;
|
||||||
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
|
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
|
||||||
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||||
|
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.Listener;
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
|
|
@ -29,6 +30,7 @@ import org.springframework.context.annotation.Configuration;
|
||||||
* Configuration for Spring AMQP annotation driven endpoints.
|
* Configuration for Spring AMQP annotation driven endpoints.
|
||||||
*
|
*
|
||||||
* @author Stephane Nicoll
|
* @author Stephane Nicoll
|
||||||
|
* @author Josh Thornhill
|
||||||
* @since 1.2.0
|
* @since 1.2.0
|
||||||
*/
|
*/
|
||||||
@Configuration
|
@Configuration
|
||||||
|
|
@ -41,6 +43,22 @@ class RabbitAnnotationDrivenConfiguration {
|
||||||
ConnectionFactory connectionFactory, RabbitProperties config) {
|
ConnectionFactory connectionFactory, RabbitProperties config) {
|
||||||
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
||||||
factory.setConnectionFactory(connectionFactory);
|
factory.setConnectionFactory(connectionFactory);
|
||||||
|
Listener listenerConfig = config.getListener();
|
||||||
|
if (listenerConfig.getAcknowledgeMode() != null) {
|
||||||
|
factory.setAcknowledgeMode(listenerConfig.getAcknowledgeMode());
|
||||||
|
}
|
||||||
|
if (listenerConfig.getConcurrency() != null) {
|
||||||
|
factory.setConcurrentConsumers(listenerConfig.getConcurrency());
|
||||||
|
}
|
||||||
|
if (listenerConfig.getMaxConcurrency() != null) {
|
||||||
|
factory.setMaxConcurrentConsumers(listenerConfig.getMaxConcurrency());
|
||||||
|
}
|
||||||
|
if (listenerConfig.getPrefetch() != null) {
|
||||||
|
factory.setPrefetchCount(listenerConfig.getPrefetch());
|
||||||
|
}
|
||||||
|
if (listenerConfig.getTransactionSize() != null) {
|
||||||
|
factory.setTxSize(listenerConfig.getTransactionSize());
|
||||||
|
}
|
||||||
return factory;
|
return factory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ import java.util.LinkedHashSet;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.springframework.amqp.core.AcknowledgeMode;
|
||||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
import org.springframework.util.StringUtils;
|
import org.springframework.util.StringUtils;
|
||||||
|
|
||||||
|
|
@ -30,6 +31,7 @@ import org.springframework.util.StringUtils;
|
||||||
* @author Dave Syer
|
* @author Dave Syer
|
||||||
* @author Stephane Nicoll
|
* @author Stephane Nicoll
|
||||||
* @author Andy Wilkinson
|
* @author Andy Wilkinson
|
||||||
|
* @author Josh Thornhill
|
||||||
*/
|
*/
|
||||||
@ConfigurationProperties(prefix = "spring.rabbitmq")
|
@ConfigurationProperties(prefix = "spring.rabbitmq")
|
||||||
public class RabbitProperties {
|
public class RabbitProperties {
|
||||||
|
|
@ -74,6 +76,11 @@ public class RabbitProperties {
|
||||||
*/
|
*/
|
||||||
private Integer requestedHeartbeat;
|
private Integer requestedHeartbeat;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Listener container configuration.
|
||||||
|
*/
|
||||||
|
private final Listener listener = new Listener();
|
||||||
|
|
||||||
public String getHost() {
|
public String getHost() {
|
||||||
if (this.addresses == null) {
|
if (this.addresses == null) {
|
||||||
return this.host;
|
return this.host;
|
||||||
|
|
@ -180,6 +187,10 @@ public class RabbitProperties {
|
||||||
this.requestedHeartbeat = requestedHeartbeat;
|
this.requestedHeartbeat = requestedHeartbeat;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Listener getListener() {
|
||||||
|
return this.listener;
|
||||||
|
}
|
||||||
|
|
||||||
public static class Ssl {
|
public static class Ssl {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -271,4 +282,75 @@ public class RabbitProperties {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class Listener {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acknowledge mode of container.
|
||||||
|
*/
|
||||||
|
private AcknowledgeMode acknowledgeMode;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Minimum number of consumers.
|
||||||
|
*/
|
||||||
|
private Integer concurrency;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maximum number of consumers.
|
||||||
|
*/
|
||||||
|
private Integer maxConcurrency;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of messages to be handled in a single request. It should be greater than
|
||||||
|
* or equal to the transaction size (if used).
|
||||||
|
*/
|
||||||
|
private Integer prefetch;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of messages to be processed in a transaction. For best results it should
|
||||||
|
* be less than or equal to the prefetch count.
|
||||||
|
*/
|
||||||
|
private Integer transactionSize;
|
||||||
|
|
||||||
|
public AcknowledgeMode getAcknowledgeMode() {
|
||||||
|
return this.acknowledgeMode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAcknowledgeMode(AcknowledgeMode acknowledgeMode) {
|
||||||
|
this.acknowledgeMode = acknowledgeMode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getConcurrency() {
|
||||||
|
return this.concurrency;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setConcurrency(Integer concurrency) {
|
||||||
|
this.concurrency = concurrency;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getMaxConcurrency() {
|
||||||
|
return this.maxConcurrency;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxConcurrency(Integer maxConcurrency) {
|
||||||
|
this.maxConcurrency = maxConcurrency;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getPrefetch() {
|
||||||
|
return this.prefetch;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPrefetch(Integer prefetch) {
|
||||||
|
this.prefetch = prefetch;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getTransactionSize() {
|
||||||
|
return this.transactionSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTransactionSize(Integer transactionSize) {
|
||||||
|
this.transactionSize = transactionSize;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,8 @@ import org.junit.After;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.ExpectedException;
|
import org.junit.rules.ExpectedException;
|
||||||
|
|
||||||
|
import org.springframework.amqp.core.AcknowledgeMode;
|
||||||
import org.springframework.amqp.core.AmqpAdmin;
|
import org.springframework.amqp.core.AmqpAdmin;
|
||||||
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
|
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
|
||||||
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
|
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
|
||||||
|
|
@ -185,6 +187,26 @@ public class RabbitAutoConfigurationTests {
|
||||||
verify(rabbitListenerContainerFactory).setTxSize(10);
|
verify(rabbitListenerContainerFactory).setTxSize(10);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRabbitListenerContainerFactoryWithCustomSettings() {
|
||||||
|
load(TestConfiguration.class,
|
||||||
|
"spring.rabbitmq.listener.acknowledgeMode:manual",
|
||||||
|
"spring.rabbitmq.listener.concurrency:5",
|
||||||
|
"spring.rabbitmq.listener.maxConcurrency:10",
|
||||||
|
"spring.rabbitmq.listener.prefetch=40",
|
||||||
|
"spring.rabbitmq.listener.transactionSize:20");
|
||||||
|
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = this.context
|
||||||
|
.getBean("rabbitListenerContainerFactory",
|
||||||
|
SimpleRabbitListenerContainerFactory.class);
|
||||||
|
DirectFieldAccessor dfa = new DirectFieldAccessor(rabbitListenerContainerFactory);
|
||||||
|
assertEquals(AcknowledgeMode.MANUAL,
|
||||||
|
dfa.getPropertyValue("acknowledgeMode"));
|
||||||
|
assertEquals(5, dfa.getPropertyValue("concurrentConsumers"));
|
||||||
|
assertEquals(10, dfa.getPropertyValue("maxConcurrentConsumers"));
|
||||||
|
assertEquals(40, dfa.getPropertyValue("prefetchCount"));
|
||||||
|
assertEquals(20, dfa.getPropertyValue("txSize"));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void enableRabbitAutomatically() throws Exception {
|
public void enableRabbitAutomatically() throws Exception {
|
||||||
load(NoEnableRabbitConfiguration.class);
|
load(NoEnableRabbitConfiguration.class);
|
||||||
|
|
|
||||||
|
|
@ -469,6 +469,11 @@ content into your application; rather pick only the properties that you need.
|
||||||
spring.rabbitmq.port= # connection port
|
spring.rabbitmq.port= # connection port
|
||||||
spring.rabbitmq.password= # login password
|
spring.rabbitmq.password= # login password
|
||||||
spring.rabbitmq.requested-heartbeat= # requested heartbeat timeout, in seconds; zero for none
|
spring.rabbitmq.requested-heartbeat= # requested heartbeat timeout, in seconds; zero for none
|
||||||
|
spring.rabbitmq.listener.acknowledge-mode= # acknowledge mode of container
|
||||||
|
spring.rabbitmq.listener.concurrency= # minimum number of consumers
|
||||||
|
spring.rabbitmq.listener.max-concurrency= # maximum number of consumers
|
||||||
|
spring.rabbitmq.listener.prefetch= # number of messages to be handled in a single request
|
||||||
|
spring.rabbitmq.listener.transaction-size= # number of messages to be processed in a transaction
|
||||||
spring.rabbitmq.ssl.enabled=false # enable SSL support
|
spring.rabbitmq.ssl.enabled=false # enable SSL support
|
||||||
spring.rabbitmq.ssl.key-store= # path to the key store that holds the SSL certificate
|
spring.rabbitmq.ssl.key-store= # path to the key store that holds the SSL certificate
|
||||||
spring.rabbitmq.ssl.key-store-password= # password used to access the key store
|
spring.rabbitmq.ssl.key-store-password= # password used to access the key store
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue