diff --git a/org.springframework.jms/src/main/java/org/springframework/jms/config/AbstractListenerContainerParser.java b/org.springframework.jms/src/main/java/org/springframework/jms/config/AbstractListenerContainerParser.java index 566cccb5b86..7b470f949dd 100644 --- a/org.springframework.jms/src/main/java/org/springframework/jms/config/AbstractListenerContainerParser.java +++ b/org.springframework.jms/src/main/java/org/springframework/jms/config/AbstractListenerContainerParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2009 the original author or authors. + * Copyright 2002-2010 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -270,28 +270,4 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser { return (Boolean) configDef.getPropertyValues().getPropertyValue("pubSubDomain").getValue(); } - protected int[] parseConcurrency(Element ele, ParserContext parserContext) { - String concurrency = ele.getAttribute(CONCURRENCY_ATTRIBUTE); - if (!StringUtils.hasText(concurrency)) { - return null; - } - try { - int separatorIndex = concurrency.indexOf('-'); - if (separatorIndex != -1) { - int[] result = new int[2]; - result[0] = Integer.parseInt(concurrency.substring(0, separatorIndex)); - result[1] = Integer.parseInt(concurrency.substring(separatorIndex + 1, concurrency.length())); - return result; - } - else { - return new int[] {1, Integer.parseInt(concurrency)}; - } - } - catch (NumberFormatException ex) { - parserContext.getReaderContext().error("Invalid concurrency value [" + concurrency + "]: only " + - "single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported.", ele, ex); - return null; - } - } - } diff --git a/org.springframework.jms/src/main/java/org/springframework/jms/config/JcaListenerContainerParser.java b/org.springframework.jms/src/main/java/org/springframework/jms/config/JcaListenerContainerParser.java index cc2767fc7bd..9f415bd8fb1 100644 --- a/org.springframework.jms/src/main/java/org/springframework/jms/config/JcaListenerContainerParser.java +++ b/org.springframework.jms/src/main/java/org/springframework/jms/config/JcaListenerContainerParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2009 the original author or authors. + * Copyright 2002-2010 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,12 +16,13 @@ package org.springframework.jms.config; +import org.w3c.dom.Element; + import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.RuntimeBeanReference; import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.beans.factory.xml.ParserContext; import org.springframework.util.StringUtils; -import org.w3c.dom.Element; /** * Parser for the JMS <jca-listener-container> element. @@ -41,9 +42,8 @@ class JcaListenerContainerParser extends AbstractListenerContainerParser { containerDef.setSource(parserContext.extractSource(containerEle)); containerDef.setBeanClassName("org.springframework.jms.listener.endpoint.JmsMessageEndpointManager"); - String resourceAdapterBeanName = "resourceAdapter"; if (containerEle.hasAttribute(RESOURCE_ADAPTER_ATTRIBUTE)) { - resourceAdapterBeanName = containerEle.getAttribute(RESOURCE_ADAPTER_ATTRIBUTE); + String resourceAdapterBeanName = containerEle.getAttribute(RESOURCE_ADAPTER_ATTRIBUTE); if (!StringUtils.hasText(resourceAdapterBeanName)) { parserContext.getReaderContext().error( "Listener container 'resource-adapter' attribute contains empty value.", containerEle); @@ -88,14 +88,9 @@ class JcaListenerContainerParser extends AbstractListenerContainerParser { new RuntimeBeanReference(transactionManagerBeanName)); } - int[] concurrency = parseConcurrency(containerEle, parserContext); - if (concurrency != null) { - configDef.getPropertyValues().add("maxConcurrency", concurrency[1]); - } - - String phase = containerEle.getAttribute(PHASE_ATTRIBUTE); - if (StringUtils.hasText(phase)) { - containerDef.getPropertyValues().add("phase", phase); + String concurrency = containerEle.getAttribute(CONCURRENCY_ATTRIBUTE); + if (StringUtils.hasText(concurrency)) { + configDef.getPropertyValues().add("concurrency", concurrency); } String prefetch = containerEle.getAttribute(PREFETCH_ATTRIBUTE); @@ -103,6 +98,11 @@ class JcaListenerContainerParser extends AbstractListenerContainerParser { configDef.getPropertyValues().add("prefetchSize", new Integer(prefetch)); } + String phase = containerEle.getAttribute(PHASE_ATTRIBUTE); + if (StringUtils.hasText(phase)) { + containerDef.getPropertyValues().add("phase", phase); + } + containerDef.getPropertyValues().add("activationSpecConfig", configDef); return containerDef; diff --git a/org.springframework.jms/src/main/java/org/springframework/jms/config/JmsListenerContainerParser.java b/org.springframework.jms/src/main/java/org/springframework/jms/config/JmsListenerContainerParser.java index 554c79baf6c..444eb9a757e 100644 --- a/org.springframework.jms/src/main/java/org/springframework/jms/config/JmsListenerContainerParser.java +++ b/org.springframework.jms/src/main/java/org/springframework/jms/config/JmsListenerContainerParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2009 the original author or authors. + * Copyright 2002-2010 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,12 +18,13 @@ package org.springframework.jms.config; import javax.jms.Session; +import org.w3c.dom.Element; + import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.RuntimeBeanReference; import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.beans.factory.xml.ParserContext; import org.springframework.util.StringUtils; -import org.w3c.dom.Element; /** * Parser for the JMS <listener-container> element. @@ -107,11 +108,6 @@ class JmsListenerContainerParser extends AbstractListenerContainerParser { new RuntimeBeanReference(destinationResolverBeanName)); } - String phase = containerEle.getAttribute(PHASE_ATTRIBUTE); - if (StringUtils.hasText(phase)) { - containerDef.getPropertyValues().add("phase", phase); - } - String cache = containerEle.getAttribute(CACHE_ATTRIBUTE); if (StringUtils.hasText(cache)) { if (containerType.startsWith("simple")) { @@ -148,15 +144,9 @@ class JmsListenerContainerParser extends AbstractListenerContainerParser { } } - int[] concurrency = parseConcurrency(containerEle, parserContext); - if (concurrency != null) { - if (containerType.startsWith("default")) { - containerDef.getPropertyValues().add("concurrentConsumers", concurrency[0]); - containerDef.getPropertyValues().add("maxConcurrentConsumers", concurrency[1]); - } - else { - containerDef.getPropertyValues().add("concurrentConsumers", concurrency[1]); - } + String concurrency = containerEle.getAttribute(CONCURRENCY_ATTRIBUTE); + if (StringUtils.hasText(concurrency)) { + containerDef.getPropertyValues().add("concurrency", concurrency); } String prefetch = containerEle.getAttribute(PREFETCH_ATTRIBUTE); @@ -166,6 +156,11 @@ class JmsListenerContainerParser extends AbstractListenerContainerParser { } } + String phase = containerEle.getAttribute(PHASE_ATTRIBUTE); + if (StringUtils.hasText(phase)) { + containerDef.getPropertyValues().add("phase", phase); + } + return containerDef; } diff --git a/org.springframework.jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java b/org.springframework.jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java index 58224a22d28..5b6307d10c1 100644 --- a/org.springframework.jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java +++ b/org.springframework.jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2009 the original author or authors. + * Copyright 2002-2010 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -257,6 +257,31 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe } + /** + * Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple + * upper limit String, e.g. "10" (the lower limit will be 1 in this case). + *

This listener container will always hold on to the minimum number of consumers + * ({@link #setConcurrentConsumers}) and will slowly scale up to the maximum number + * of consumers {@link #setMaxConcurrentConsumers} in case of increasing load. + */ + public void setConcurrency(String concurrency) { + try { + int separatorIndex = concurrency.indexOf('-'); + if (separatorIndex != -1) { + setConcurrentConsumers(Integer.parseInt(concurrency.substring(0, separatorIndex))); + setMaxConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1, concurrency.length()))); + } + else { + setConcurrentConsumers(1); + setMaxConcurrentConsumers(Integer.parseInt(concurrency)); + } + } + catch (NumberFormatException ex) { + throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " + + "single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported."); + } + } + /** * Specify the number of concurrent consumers to create. Default is 1. *

Specifying a higher value for this setting will increase the standard diff --git a/org.springframework.jms/src/main/java/org/springframework/jms/listener/SimpleMessageListenerContainer.java b/org.springframework.jms/src/main/java/org/springframework/jms/listener/SimpleMessageListenerContainer.java index 4a9d1233053..a47d1f99e21 100644 --- a/org.springframework.jms/src/main/java/org/springframework/jms/listener/SimpleMessageListenerContainer.java +++ b/org.springframework.jms/src/main/java/org/springframework/jms/listener/SimpleMessageListenerContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2009 the original author or authors. + * Copyright 2002-2010 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -89,6 +89,33 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta return this.pubSubNoLocal; } + /** + * Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple + * upper limit String, e.g. "10". + *

This listener container will always hold on to the maximum number of + * consumers {@link #setConcurrentConsumers} since it is unable to scale. + *

This property is primarily supported for configuration compatibility with + * {@link DefaultMessageListenerContainer}. For this local listener container, + * generally use {@link #setConcurrentConsumers} instead. + */ + public void setConcurrency(String concurrency) { + try { + int separatorIndex = concurrency.indexOf('-'); + if (separatorIndex != -1) { + setConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1, concurrency.length()))); + } + else { + setConcurrentConsumers(Integer.parseInt(concurrency)); + } + } + catch (NumberFormatException ex) { + throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " + + "single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported. " + + "Note that SimpleMessageListenerContainer will effectively ignore the minimum value and " + + "always keep a fixed number of consumers according to the maximum value."); + } + } + /** * Specify the number of concurrent consumers to create. Default is 1. *

Raising the number of concurrent consumers is recommendable in order diff --git a/org.springframework.jms/src/main/java/org/springframework/jms/listener/endpoint/JmsActivationSpecConfig.java b/org.springframework.jms/src/main/java/org/springframework/jms/listener/endpoint/JmsActivationSpecConfig.java index f3a9f6f82f9..637790da506 100644 --- a/org.springframework.jms/src/main/java/org/springframework/jms/listener/endpoint/JmsActivationSpecConfig.java +++ b/org.springframework.jms/src/main/java/org/springframework/jms/listener/endpoint/JmsActivationSpecConfig.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2007 the original author or authors. + * Copyright 2002-2010 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,8 @@ package org.springframework.jms.listener.endpoint; import javax.jms.Session; +import org.springframework.core.Constants; + /** * Common configuration object for activating a JMS message endpoint. * Gets converted into a provider-specific JCA 1.5 ActivationSpec @@ -34,6 +36,10 @@ import javax.jms.Session; */ public class JmsActivationSpecConfig { + /** Constants instance for javax.jms.Session */ + private static final Constants sessionConstants = new Constants(Session.class); + + private String destinationName; private boolean pubSubDomain = false; @@ -101,26 +107,96 @@ public class JmsActivationSpecConfig { return this.messageSelector; } + /** + * Set the JMS acknowledgement mode by the name of the corresponding constant + * in the JMS {@link Session} interface, e.g. "CLIENT_ACKNOWLEDGE". + *

Note that JCA resource adapters generally only support auto and dups-ok + * (see Spring's {@link StandardJmsActivationSpecFactory}). ActiveMQ also + * supports "SESSION_TRANSACTED" in the form of RA-managed transactions + * (automatically translated by Spring's {@link DefaultJmsActivationSpecFactory}. + * @param constantName the name of the {@link Session} acknowledge mode constant + * @see javax.jms.Session#AUTO_ACKNOWLEDGE + * @see javax.jms.Session#CLIENT_ACKNOWLEDGE + * @see javax.jms.Session#DUPS_OK_ACKNOWLEDGE + * @see javax.jms.Session#SESSION_TRANSACTED + * @see StandardJmsActivationSpecFactory + * @see DefaultJmsActivationSpecFactory + */ + public void setAcknowledgeModeName(String constantName) { + setAcknowledgeMode(sessionConstants.asNumber(constantName).intValue()); + } + + /** + * Set the JMS acknowledgement mode to use. + * @see javax.jms.Session#AUTO_ACKNOWLEDGE + * @see javax.jms.Session#CLIENT_ACKNOWLEDGE + * @see javax.jms.Session#DUPS_OK_ACKNOWLEDGE + * @see javax.jms.Session#SESSION_TRANSACTED + */ public void setAcknowledgeMode(int acknowledgeMode) { this.acknowledgeMode = acknowledgeMode; } + /** + * Return the JMS acknowledgement mode to use. + */ public int getAcknowledgeMode() { return this.acknowledgeMode; } + /** + * Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple + * upper limit String, e.g. "10". + *

JCA listener containers will always scale from zero to the given upper limit. + * A specified lower limit will effectively be ignored. + *

This property is primarily supported for configuration compatibility with + * {@link org.springframework.jms.listener.DefaultMessageListenerContainer}. + * For this activation config, generally use {@link #setMaxConcurrency} instead. + */ + public void setConcurrency(String concurrency) { + try { + int separatorIndex = concurrency.indexOf('-'); + if (separatorIndex != -1) { + setMaxConcurrency(Integer.parseInt(concurrency.substring(separatorIndex + 1, concurrency.length()))); + } + else { + setMaxConcurrency(Integer.parseInt(concurrency)); + } + } + catch (NumberFormatException ex) { + throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " + + "single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported. " + + "Note that JmsActivationSpecConfig will effectively ignore the minimum value and " + + "scale from zero up to the number of consumers according to the maximum value."); + } + } + + /** + * Specify the maximum number of consumers/sessions to use, effectively + * controlling the number of concurrent invocations on the target listener. + */ public void setMaxConcurrency(int maxConcurrency) { this.maxConcurrency = maxConcurrency; } + /** + * Return the maximum number of consumers/sessions to use. + */ public int getMaxConcurrency() { return this.maxConcurrency; } + /** + * Specify the maximum number of messages to load into a session + * (a kind of batch size). + */ public void setPrefetchSize(int prefetchSize) { this.prefetchSize = prefetchSize; } + /** + * Return the maximum number of messages to load into a session. + */ public int getPrefetchSize() { return this.prefetchSize; } diff --git a/org.springframework.jms/src/test/java/org/springframework/jms/config/JmsNamespaceHandlerTests.java b/org.springframework.jms/src/test/java/org/springframework/jms/config/JmsNamespaceHandlerTests.java index 6955b6fc398..acd9e4f474f 100644 --- a/org.springframework.jms/src/test/java/org/springframework/jms/config/JmsNamespaceHandlerTests.java +++ b/org.springframework.jms/src/test/java/org/springframework/jms/config/JmsNamespaceHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2009 the original author or authors. + * Copyright 2002-2010 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -86,9 +86,13 @@ public class JmsNamespaceHandlerTests extends TestCase { for (DefaultMessageListenerContainer container : containers.values()) { if (container.getConnectionFactory().equals(defaultConnectionFactory)) { defaultConnectionFactoryCount++; + assertEquals(2, container.getConcurrentConsumers()); + assertEquals(3, container.getMaxConcurrentConsumers()); } else if (container.getConnectionFactory().equals(explicitConnectionFactory)) { explicitConnectionFactoryCount++; + assertEquals(1, container.getConcurrentConsumers()); + assertEquals(2, container.getMaxConcurrentConsumers()); } } diff --git a/org.springframework.jms/src/test/java/org/springframework/jms/config/jmsNamespaceHandlerTests.xml b/org.springframework.jms/src/test/java/org/springframework/jms/config/jmsNamespaceHandlerTests.xml index 5634d157a70..e9cade3bac7 100644 --- a/org.springframework.jms/src/test/java/org/springframework/jms/config/jmsNamespaceHandlerTests.xml +++ b/org.springframework.jms/src/test/java/org/springframework/jms/config/jmsNamespaceHandlerTests.xml @@ -7,16 +7,24 @@ + transaction-manager="testTransactionManager" error-handler="testErrorHandler" concurrency="1-2" phase="99"> - + + + + + 2-3 + + + +