Customize default `JmsListenerFactory`
Expose acknowledgment mode and concurrency settings in configuration for the default `JmsListenerContainerFactory` Closes gh-3519
This commit is contained in:
parent
c0d0aeac5b
commit
22a7b0cdee
|
|
@ -67,6 +67,14 @@ class JmsAnnotationDrivenConfiguration {
|
||||||
if (this.destinationResolver != null) {
|
if (this.destinationResolver != null) {
|
||||||
factory.setDestinationResolver(this.destinationResolver);
|
factory.setDestinationResolver(this.destinationResolver);
|
||||||
}
|
}
|
||||||
|
JmsProperties.Listener listener = this.properties.getListener();
|
||||||
|
if (listener.getAcknowledgmentMode() != null) {
|
||||||
|
factory.setSessionAcknowledgeMode(listener.getAcknowledgmentMode().getMode());
|
||||||
|
}
|
||||||
|
String concurrency = listener.formatConcurrency();
|
||||||
|
if (concurrency != null) {
|
||||||
|
factory.setConcurrency(concurrency);
|
||||||
|
}
|
||||||
return factory;
|
return factory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2012-2014 the original author or authors.
|
* Copyright 2012-2015 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
|
@ -38,6 +38,8 @@ public class JmsProperties {
|
||||||
*/
|
*/
|
||||||
private String jndiName;
|
private String jndiName;
|
||||||
|
|
||||||
|
private final Listener listener = new Listener();
|
||||||
|
|
||||||
public boolean isPubSubDomain() {
|
public boolean isPubSubDomain() {
|
||||||
return this.pubSubDomain;
|
return this.pubSubDomain;
|
||||||
}
|
}
|
||||||
|
|
@ -54,4 +56,98 @@ public class JmsProperties {
|
||||||
this.jndiName = jndiName;
|
this.jndiName = jndiName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Listener getListener() {
|
||||||
|
return listener;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Listener {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acknowledgment mode of the container. By default, the listener is
|
||||||
|
* transacted with automatic acknowledgment.
|
||||||
|
*/
|
||||||
|
private AcknowledgmentMode acknowledgmentMode;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Minimum number of concurrent consumers.
|
||||||
|
*/
|
||||||
|
private Integer concurrency;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maximum number of concurrent consumers.
|
||||||
|
*/
|
||||||
|
private Integer maxConcurrency;
|
||||||
|
|
||||||
|
public AcknowledgmentMode getAcknowledgmentMode() {
|
||||||
|
return acknowledgmentMode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAcknowledgmentMode(AcknowledgmentMode acknowledgmentMode) {
|
||||||
|
this.acknowledgmentMode = acknowledgmentMode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getConcurrency() {
|
||||||
|
return this.concurrency;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setConcurrency(Integer concurrency) {
|
||||||
|
this.concurrency = concurrency;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getMaxConcurrency() {
|
||||||
|
return this.maxConcurrency;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxConcurrency(Integer maxConcurrency) {
|
||||||
|
this.maxConcurrency = maxConcurrency;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String formatConcurrency() {
|
||||||
|
if (this.concurrency == null) {
|
||||||
|
return (this.maxConcurrency != null ? "1-" + this.maxConcurrency : null);
|
||||||
|
}
|
||||||
|
return (this.maxConcurrency != null ? this.concurrency + "-" +
|
||||||
|
this.maxConcurrency : String.valueOf(this.concurrency));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Translate the acknowledgment modes defined on the {@link javax.jms.Session}.
|
||||||
|
*
|
||||||
|
* <p>{@link javax.jms.Session#SESSION_TRANSACTED} is not defined as we take
|
||||||
|
* care of this already via a call to {@code setSessionTransacted}.
|
||||||
|
*/
|
||||||
|
public enum AcknowledgmentMode {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Messages sent or received from the session are automatically acknowledged. This
|
||||||
|
* is the simplest mode and enables once-only message delivery guarantee.
|
||||||
|
*/
|
||||||
|
AUTO(1),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Messages are acknowledged once the message listener implementation has
|
||||||
|
* called {@link javax.jms.Message#acknowledge()}. This mode gives the application
|
||||||
|
* (rather than the JMS provider) complete control over message acknowledgement.
|
||||||
|
*/
|
||||||
|
CLIENT(2),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Similar to auto acknowledgment except that said acknowledgment is lazy. As a
|
||||||
|
* consequence, the messages might be delivered more than once. This mode enables
|
||||||
|
* at-least-once message delivery guarantee.
|
||||||
|
*/
|
||||||
|
DUPS_OK(3);
|
||||||
|
|
||||||
|
private final int mode;
|
||||||
|
|
||||||
|
AcknowledgmentMode(int mode) {
|
||||||
|
this.mode = mode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMode() {
|
||||||
|
return mode;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2012-2014 the original author or authors.
|
* Copyright 2012-2015 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
|
@ -17,6 +17,7 @@
|
||||||
package org.springframework.boot.autoconfigure.jms;
|
package org.springframework.boot.autoconfigure.jms;
|
||||||
|
|
||||||
import javax.jms.ConnectionFactory;
|
import javax.jms.ConnectionFactory;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.pool.PooledConnectionFactory;
|
import org.apache.activemq.pool.PooledConnectionFactory;
|
||||||
|
|
@ -143,6 +144,23 @@ public class JmsAutoConfigurationTests {
|
||||||
jmsListenerContainerFactory.getClass());
|
jmsListenerContainerFactory.getClass());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJmsListenerContainerFactoryWithCustomSettings() {
|
||||||
|
load(EnableJmsConfiguration.class,
|
||||||
|
"spring.jms.listener.acknowledgmentMode=client",
|
||||||
|
"spring.jms.listener.concurrency=2",
|
||||||
|
"spring.jms.listener.maxConcurrency=10");
|
||||||
|
JmsListenerContainerFactory<?> jmsListenerContainerFactory = this.context
|
||||||
|
.getBean("jmsListenerContainerFactory", JmsListenerContainerFactory.class);
|
||||||
|
assertEquals(DefaultJmsListenerContainerFactory.class,
|
||||||
|
jmsListenerContainerFactory.getClass());
|
||||||
|
DefaultMessageListenerContainer listenerContainer = ((DefaultJmsListenerContainerFactory)
|
||||||
|
jmsListenerContainerFactory).createListenerContainer(mock(JmsListenerEndpoint.class));
|
||||||
|
assertEquals(2, listenerContainer.getConcurrentConsumers());
|
||||||
|
assertEquals(10, listenerContainer.getMaxConcurrentConsumers());
|
||||||
|
assertEquals(Session.CLIENT_ACKNOWLEDGE, listenerContainer.getSessionAcknowledgeMode());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDefaultContainerFactoryWithJtaTransactionManager() {
|
public void testDefaultContainerFactoryWithJtaTransactionManager() {
|
||||||
this.context = createContext(TestConfiguration7.class,
|
this.context = createContext(TestConfiguration7.class,
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,59 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2012-2015 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.boot.autoconfigure.jms;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests for {@link JmsProperties}.
|
||||||
|
*
|
||||||
|
* @author Stephane Nicoll
|
||||||
|
*/
|
||||||
|
public class JmsPropertiesTests {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void formatConcurrencyNull() {
|
||||||
|
JmsProperties properties = new JmsProperties();
|
||||||
|
assertNull(properties.getListener().formatConcurrency());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void formatConcurrencyOnlyLowerBound() {
|
||||||
|
JmsProperties properties = new JmsProperties();
|
||||||
|
properties.getListener().setConcurrency(2);
|
||||||
|
assertEquals("2", properties.getListener().formatConcurrency());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void formatConcurrencyOnlyHigherBound() {
|
||||||
|
JmsProperties properties = new JmsProperties();
|
||||||
|
properties.getListener().setMaxConcurrency(5);
|
||||||
|
assertEquals("1-5", properties.getListener().formatConcurrency());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void formatConcurrencyBothBounds() {
|
||||||
|
JmsProperties properties = new JmsProperties();
|
||||||
|
properties.getListener().setConcurrency(2);
|
||||||
|
properties.getListener().setMaxConcurrency(10);
|
||||||
|
assertEquals("2-10", properties.getListener().formatConcurrency());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -523,6 +523,9 @@ content into your application; rather pick only the properties that you need.
|
||||||
|
|
||||||
# JMS ({sc-spring-boot-autoconfigure}/jms/JmsProperties.{sc-ext}[JmsProperties])
|
# JMS ({sc-spring-boot-autoconfigure}/jms/JmsProperties.{sc-ext}[JmsProperties])
|
||||||
spring.jms.jndi-name= # JNDI location of a JMS ConnectionFactory
|
spring.jms.jndi-name= # JNDI location of a JMS ConnectionFactory
|
||||||
|
spring.jms.listener.acknowledgment-mode= # session acknowledgment mode
|
||||||
|
spring.jms.listener.concurrency= # minimum number of concurrent consumers
|
||||||
|
spring.jms.listener.maxConcurrency= # maximum number of concurrent consumers
|
||||||
spring.jms.pub-sub-domain= # false for queue (default), true for topic
|
spring.jms.pub-sub-domain= # false for queue (default), true for topic
|
||||||
|
|
||||||
# Email ({sc-spring-boot-autoconfigure}/mail/MailProperties.{sc-ext}[MailProperties])
|
# Email ({sc-spring-boot-autoconfigure}/mail/MailProperties.{sc-ext}[MailProperties])
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue