added "concurrency" property to Default/SimpleMessageListenerContainer and JmsActivationSpecConfig, supporting placeholders for the jms namespace "concurrency" attribute now (SPR-6232)

git-svn-id: https://src.springframework.org/svn/spring-framework/trunk@2811 50f2f4bb-b051-0410-bef5-90022cba6387
This commit is contained in:
Juergen Hoeller 2010-01-14 11:17:15 +00:00
parent 247b694fec
commit 2018118789
8 changed files with 170 additions and 59 deletions

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2009 the original author or authors. * Copyright 2002-2010 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -270,28 +270,4 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser {
return (Boolean) configDef.getPropertyValues().getPropertyValue("pubSubDomain").getValue(); return (Boolean) configDef.getPropertyValues().getPropertyValue("pubSubDomain").getValue();
} }
protected int[] parseConcurrency(Element ele, ParserContext parserContext) {
String concurrency = ele.getAttribute(CONCURRENCY_ATTRIBUTE);
if (!StringUtils.hasText(concurrency)) {
return null;
}
try {
int separatorIndex = concurrency.indexOf('-');
if (separatorIndex != -1) {
int[] result = new int[2];
result[0] = Integer.parseInt(concurrency.substring(0, separatorIndex));
result[1] = Integer.parseInt(concurrency.substring(separatorIndex + 1, concurrency.length()));
return result;
}
else {
return new int[] {1, Integer.parseInt(concurrency)};
}
}
catch (NumberFormatException ex) {
parserContext.getReaderContext().error("Invalid concurrency value [" + concurrency + "]: only " +
"single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported.", ele, ex);
return null;
}
}
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2009 the original author or authors. * Copyright 2002-2010 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -16,12 +16,13 @@
package org.springframework.jms.config; package org.springframework.jms.config;
import org.w3c.dom.Element;
import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.RuntimeBeanReference; import org.springframework.beans.factory.config.RuntimeBeanReference;
import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.beans.factory.xml.ParserContext; import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.w3c.dom.Element;
/** /**
* Parser for the JMS <code>&lt;jca-listener-container&gt;</code> element. * Parser for the JMS <code>&lt;jca-listener-container&gt;</code> element.
@ -41,9 +42,8 @@ class JcaListenerContainerParser extends AbstractListenerContainerParser {
containerDef.setSource(parserContext.extractSource(containerEle)); containerDef.setSource(parserContext.extractSource(containerEle));
containerDef.setBeanClassName("org.springframework.jms.listener.endpoint.JmsMessageEndpointManager"); containerDef.setBeanClassName("org.springframework.jms.listener.endpoint.JmsMessageEndpointManager");
String resourceAdapterBeanName = "resourceAdapter";
if (containerEle.hasAttribute(RESOURCE_ADAPTER_ATTRIBUTE)) { if (containerEle.hasAttribute(RESOURCE_ADAPTER_ATTRIBUTE)) {
resourceAdapterBeanName = containerEle.getAttribute(RESOURCE_ADAPTER_ATTRIBUTE); String resourceAdapterBeanName = containerEle.getAttribute(RESOURCE_ADAPTER_ATTRIBUTE);
if (!StringUtils.hasText(resourceAdapterBeanName)) { if (!StringUtils.hasText(resourceAdapterBeanName)) {
parserContext.getReaderContext().error( parserContext.getReaderContext().error(
"Listener container 'resource-adapter' attribute contains empty value.", containerEle); "Listener container 'resource-adapter' attribute contains empty value.", containerEle);
@ -88,14 +88,9 @@ class JcaListenerContainerParser extends AbstractListenerContainerParser {
new RuntimeBeanReference(transactionManagerBeanName)); new RuntimeBeanReference(transactionManagerBeanName));
} }
int[] concurrency = parseConcurrency(containerEle, parserContext); String concurrency = containerEle.getAttribute(CONCURRENCY_ATTRIBUTE);
if (concurrency != null) { if (StringUtils.hasText(concurrency)) {
configDef.getPropertyValues().add("maxConcurrency", concurrency[1]); configDef.getPropertyValues().add("concurrency", concurrency);
}
String phase = containerEle.getAttribute(PHASE_ATTRIBUTE);
if (StringUtils.hasText(phase)) {
containerDef.getPropertyValues().add("phase", phase);
} }
String prefetch = containerEle.getAttribute(PREFETCH_ATTRIBUTE); String prefetch = containerEle.getAttribute(PREFETCH_ATTRIBUTE);
@ -103,6 +98,11 @@ class JcaListenerContainerParser extends AbstractListenerContainerParser {
configDef.getPropertyValues().add("prefetchSize", new Integer(prefetch)); configDef.getPropertyValues().add("prefetchSize", new Integer(prefetch));
} }
String phase = containerEle.getAttribute(PHASE_ATTRIBUTE);
if (StringUtils.hasText(phase)) {
containerDef.getPropertyValues().add("phase", phase);
}
containerDef.getPropertyValues().add("activationSpecConfig", configDef); containerDef.getPropertyValues().add("activationSpecConfig", configDef);
return containerDef; return containerDef;

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2009 the original author or authors. * Copyright 2002-2010 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -18,12 +18,13 @@ package org.springframework.jms.config;
import javax.jms.Session; import javax.jms.Session;
import org.w3c.dom.Element;
import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.RuntimeBeanReference; import org.springframework.beans.factory.config.RuntimeBeanReference;
import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.beans.factory.xml.ParserContext; import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.w3c.dom.Element;
/** /**
* Parser for the JMS <code>&lt;listener-container&gt;</code> element. * Parser for the JMS <code>&lt;listener-container&gt;</code> element.
@ -107,11 +108,6 @@ class JmsListenerContainerParser extends AbstractListenerContainerParser {
new RuntimeBeanReference(destinationResolverBeanName)); new RuntimeBeanReference(destinationResolverBeanName));
} }
String phase = containerEle.getAttribute(PHASE_ATTRIBUTE);
if (StringUtils.hasText(phase)) {
containerDef.getPropertyValues().add("phase", phase);
}
String cache = containerEle.getAttribute(CACHE_ATTRIBUTE); String cache = containerEle.getAttribute(CACHE_ATTRIBUTE);
if (StringUtils.hasText(cache)) { if (StringUtils.hasText(cache)) {
if (containerType.startsWith("simple")) { if (containerType.startsWith("simple")) {
@ -148,15 +144,9 @@ class JmsListenerContainerParser extends AbstractListenerContainerParser {
} }
} }
int[] concurrency = parseConcurrency(containerEle, parserContext); String concurrency = containerEle.getAttribute(CONCURRENCY_ATTRIBUTE);
if (concurrency != null) { if (StringUtils.hasText(concurrency)) {
if (containerType.startsWith("default")) { containerDef.getPropertyValues().add("concurrency", concurrency);
containerDef.getPropertyValues().add("concurrentConsumers", concurrency[0]);
containerDef.getPropertyValues().add("maxConcurrentConsumers", concurrency[1]);
}
else {
containerDef.getPropertyValues().add("concurrentConsumers", concurrency[1]);
}
} }
String prefetch = containerEle.getAttribute(PREFETCH_ATTRIBUTE); String prefetch = containerEle.getAttribute(PREFETCH_ATTRIBUTE);
@ -166,6 +156,11 @@ class JmsListenerContainerParser extends AbstractListenerContainerParser {
} }
} }
String phase = containerEle.getAttribute(PHASE_ATTRIBUTE);
if (StringUtils.hasText(phase)) {
containerDef.getPropertyValues().add("phase", phase);
}
return containerDef; return containerDef;
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2009 the original author or authors. * Copyright 2002-2010 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -257,6 +257,31 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
} }
/**
* Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple
* upper limit String, e.g. "10" (the lower limit will be 1 in this case).
* <p>This listener container will always hold on to the minimum number of consumers
* ({@link #setConcurrentConsumers}) and will slowly scale up to the maximum number
* of consumers {@link #setMaxConcurrentConsumers} in case of increasing load.
*/
public void setConcurrency(String concurrency) {
try {
int separatorIndex = concurrency.indexOf('-');
if (separatorIndex != -1) {
setConcurrentConsumers(Integer.parseInt(concurrency.substring(0, separatorIndex)));
setMaxConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1, concurrency.length())));
}
else {
setConcurrentConsumers(1);
setMaxConcurrentConsumers(Integer.parseInt(concurrency));
}
}
catch (NumberFormatException ex) {
throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " +
"single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported.");
}
}
/** /**
* Specify the number of concurrent consumers to create. Default is 1. * Specify the number of concurrent consumers to create. Default is 1.
* <p>Specifying a higher value for this setting will increase the standard * <p>Specifying a higher value for this setting will increase the standard

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2009 the original author or authors. * Copyright 2002-2010 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -89,6 +89,33 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
return this.pubSubNoLocal; return this.pubSubNoLocal;
} }
/**
* Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple
* upper limit String, e.g. "10".
* <p>This listener container will always hold on to the maximum number of
* consumers {@link #setConcurrentConsumers} since it is unable to scale.
* <p>This property is primarily supported for configuration compatibility with
* {@link DefaultMessageListenerContainer}. For this local listener container,
* generally use {@link #setConcurrentConsumers} instead.
*/
public void setConcurrency(String concurrency) {
try {
int separatorIndex = concurrency.indexOf('-');
if (separatorIndex != -1) {
setConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1, concurrency.length())));
}
else {
setConcurrentConsumers(Integer.parseInt(concurrency));
}
}
catch (NumberFormatException ex) {
throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " +
"single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported. " +
"Note that SimpleMessageListenerContainer will effectively ignore the minimum value and " +
"always keep a fixed number of consumers according to the maximum value.");
}
}
/** /**
* Specify the number of concurrent consumers to create. Default is 1. * Specify the number of concurrent consumers to create. Default is 1.
* <p>Raising the number of concurrent consumers is recommendable in order * <p>Raising the number of concurrent consumers is recommendable in order

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2007 the original author or authors. * Copyright 2002-2010 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -18,6 +18,8 @@ package org.springframework.jms.listener.endpoint;
import javax.jms.Session; import javax.jms.Session;
import org.springframework.core.Constants;
/** /**
* Common configuration object for activating a JMS message endpoint. * Common configuration object for activating a JMS message endpoint.
* Gets converted into a provider-specific JCA 1.5 ActivationSpec * Gets converted into a provider-specific JCA 1.5 ActivationSpec
@ -34,6 +36,10 @@ import javax.jms.Session;
*/ */
public class JmsActivationSpecConfig { public class JmsActivationSpecConfig {
/** Constants instance for javax.jms.Session */
private static final Constants sessionConstants = new Constants(Session.class);
private String destinationName; private String destinationName;
private boolean pubSubDomain = false; private boolean pubSubDomain = false;
@ -101,26 +107,96 @@ public class JmsActivationSpecConfig {
return this.messageSelector; return this.messageSelector;
} }
/**
* Set the JMS acknowledgement mode by the name of the corresponding constant
* in the JMS {@link Session} interface, e.g. "CLIENT_ACKNOWLEDGE".
* <p>Note that JCA resource adapters generally only support auto and dups-ok
* (see Spring's {@link StandardJmsActivationSpecFactory}). ActiveMQ also
* supports "SESSION_TRANSACTED" in the form of RA-managed transactions
* (automatically translated by Spring's {@link DefaultJmsActivationSpecFactory}.
* @param constantName the name of the {@link Session} acknowledge mode constant
* @see javax.jms.Session#AUTO_ACKNOWLEDGE
* @see javax.jms.Session#CLIENT_ACKNOWLEDGE
* @see javax.jms.Session#DUPS_OK_ACKNOWLEDGE
* @see javax.jms.Session#SESSION_TRANSACTED
* @see StandardJmsActivationSpecFactory
* @see DefaultJmsActivationSpecFactory
*/
public void setAcknowledgeModeName(String constantName) {
setAcknowledgeMode(sessionConstants.asNumber(constantName).intValue());
}
/**
* Set the JMS acknowledgement mode to use.
* @see javax.jms.Session#AUTO_ACKNOWLEDGE
* @see javax.jms.Session#CLIENT_ACKNOWLEDGE
* @see javax.jms.Session#DUPS_OK_ACKNOWLEDGE
* @see javax.jms.Session#SESSION_TRANSACTED
*/
public void setAcknowledgeMode(int acknowledgeMode) { public void setAcknowledgeMode(int acknowledgeMode) {
this.acknowledgeMode = acknowledgeMode; this.acknowledgeMode = acknowledgeMode;
} }
/**
* Return the JMS acknowledgement mode to use.
*/
public int getAcknowledgeMode() { public int getAcknowledgeMode() {
return this.acknowledgeMode; return this.acknowledgeMode;
} }
/**
* Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple
* upper limit String, e.g. "10".
* <p>JCA listener containers will always scale from zero to the given upper limit.
* A specified lower limit will effectively be ignored.
* <p>This property is primarily supported for configuration compatibility with
* {@link org.springframework.jms.listener.DefaultMessageListenerContainer}.
* For this activation config, generally use {@link #setMaxConcurrency} instead.
*/
public void setConcurrency(String concurrency) {
try {
int separatorIndex = concurrency.indexOf('-');
if (separatorIndex != -1) {
setMaxConcurrency(Integer.parseInt(concurrency.substring(separatorIndex + 1, concurrency.length())));
}
else {
setMaxConcurrency(Integer.parseInt(concurrency));
}
}
catch (NumberFormatException ex) {
throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " +
"single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported. " +
"Note that JmsActivationSpecConfig will effectively ignore the minimum value and " +
"scale from zero up to the number of consumers according to the maximum value.");
}
}
/**
* Specify the maximum number of consumers/sessions to use, effectively
* controlling the number of concurrent invocations on the target listener.
*/
public void setMaxConcurrency(int maxConcurrency) { public void setMaxConcurrency(int maxConcurrency) {
this.maxConcurrency = maxConcurrency; this.maxConcurrency = maxConcurrency;
} }
/**
* Return the maximum number of consumers/sessions to use.
*/
public int getMaxConcurrency() { public int getMaxConcurrency() {
return this.maxConcurrency; return this.maxConcurrency;
} }
/**
* Specify the maximum number of messages to load into a session
* (a kind of batch size).
*/
public void setPrefetchSize(int prefetchSize) { public void setPrefetchSize(int prefetchSize) {
this.prefetchSize = prefetchSize; this.prefetchSize = prefetchSize;
} }
/**
* Return the maximum number of messages to load into a session.
*/
public int getPrefetchSize() { public int getPrefetchSize() {
return this.prefetchSize; return this.prefetchSize;
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2009 the original author or authors. * Copyright 2002-2010 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -86,9 +86,13 @@ public class JmsNamespaceHandlerTests extends TestCase {
for (DefaultMessageListenerContainer container : containers.values()) { for (DefaultMessageListenerContainer container : containers.values()) {
if (container.getConnectionFactory().equals(defaultConnectionFactory)) { if (container.getConnectionFactory().equals(defaultConnectionFactory)) {
defaultConnectionFactoryCount++; defaultConnectionFactoryCount++;
assertEquals(2, container.getConcurrentConsumers());
assertEquals(3, container.getMaxConcurrentConsumers());
} }
else if (container.getConnectionFactory().equals(explicitConnectionFactory)) { else if (container.getConnectionFactory().equals(explicitConnectionFactory)) {
explicitConnectionFactoryCount++; explicitConnectionFactoryCount++;
assertEquals(1, container.getConcurrentConsumers());
assertEquals(2, container.getMaxConcurrentConsumers());
} }
} }

View File

@ -7,16 +7,24 @@
<jms:listener-container connection-factory="testConnectionFactory" task-executor="testTaskExecutor" <jms:listener-container connection-factory="testConnectionFactory" task-executor="testTaskExecutor"
destination-resolver="testDestinationResolver" message-converter="testMessageConverter" destination-resolver="testDestinationResolver" message-converter="testMessageConverter"
transaction-manager="testTransactionManager" error-handler="testErrorHandler" phase="99"> transaction-manager="testTransactionManager" error-handler="testErrorHandler" concurrency="1-2" phase="99">
<jms:listener id="listener1" destination="testDestination" ref="testBean1" method="setName"/> <jms:listener id="listener1" destination="testDestination" ref="testBean1" method="setName"/>
<jms:listener id="listener2" destination="testDestination" ref="testBean2" method="setName" response-destination="responseDestination"/> <jms:listener id="listener2" destination="testDestination" ref="testBean2" method="setName" response-destination="responseDestination"/>
</jms:listener-container> </jms:listener-container>
<!-- TODO: remove the task-executor reference once issue with blocking on stop is resolved --> <!-- TODO: remove the task-executor reference once issue with blocking on stop is resolved -->
<jms:listener-container task-executor="testTaskExecutor"> <jms:listener-container task-executor="testTaskExecutor" concurrency="${concurrency}">
<jms:listener destination="testDestination" ref="testBean3"/> <jms:listener destination="testDestination" ref="testBean3"/>
</jms:listener-container> </jms:listener-container>
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="properties">
<props>
<prop key="concurrency">2-3</prop>
</props>
</property>
</bean>
<jms:jca-listener-container resource-adapter="testResourceAdapter" activation-spec-factory="testActivationSpecFactory" <jms:jca-listener-container resource-adapter="testResourceAdapter" activation-spec-factory="testActivationSpecFactory"
message-converter="testMessageConverter" phase="77"> message-converter="testMessageConverter" phase="77">
<jms:listener id="listener3" destination="testDestination" ref="testBean1" method="setName"/> <jms:listener id="listener3" destination="testDestination" ref="testBean1" method="setName"/>