Revision of JMS annotated endpoint support, plus support for JMS 2.0's shared subscriptions

Issue: SPR-9882
Issue: SPR-11969
This commit is contained in:
Juergen Hoeller 2014-07-18 17:21:21 +02:00
parent 3e5946db37
commit a9100c427c
39 changed files with 873 additions and 786 deletions

View File

@ -28,8 +28,8 @@ import org.springframework.context.annotation.Import;
* Enable JMS listener annotated endpoints that are created under the cover
* by a {@link org.springframework.jms.config.JmsListenerContainerFactory
* JmsListenerContainerFactory}. To be used on
* @{@link org.springframework.context.annotation.Configuration Configuration} classes
* as follows:
* {@link org.springframework.context.annotation.Configuration Configuration}
* classes as follows:
*
* <pre class="code">
* &#064;Configuration
@ -52,8 +52,8 @@ import org.springframework.context.annotation.Import;
* used in the sample above, provides the necessary configuration options that are supported by
* the underlying {@link org.springframework.jms.listener.MessageListenerContainer MessageListenerContainer}.
*
* <p>{@code @EnableJms} enables detection of @{@link JmsListener} annotations on
* any Spring-managed bean in the container. For example, given a class {@code MyService}
* <p>{@code @EnableJms} enables detection of {@link JmsListener} annotations on any
* Spring-managed bean in the container. For example, given a class {@code MyService}:
*
* <pre class="code">
* package com.acme.foo;
@ -103,7 +103,7 @@ import org.springframework.context.annotation.Import;
*
* <p>Annotated methods can use flexible signature; in particular, it is possible to use
* the {@link org.springframework.messaging.Message Message} abstraction and related annotations,
* see @{@link JmsListener} Javadoc for more details. For instance, the following would
* see {@link JmsListener} Javadoc for more details. For instance, the following would
* inject the content of the message and a a custom "myCounter" JMS header:
*
* <pre class="code">
@ -163,7 +163,7 @@ import org.springframework.context.annotation.Import;
* are created and managed. The example below also demonstrates how to customize the
* {@code JmsHandlerMethodFactory} to use with a custom {@link org.springframework.validation.Validator
* Validator} so that payloads annotated with {@link org.springframework.validation.annotation.Validated
* @Validated} are first validated against a custom {@code Validator}.
* Validated} are first validated against a custom {@code Validator}.
*
* <pre class="code">
* &#064;Configuration

View File

@ -79,7 +79,7 @@ public @interface JmsListener {
/**
* The unique identifier of the container managing this endpoint.
* <p>if none is specified an auto-generated one is provided.
* @see org.springframework.jms.config.JmsListenerEndpointRegistry#getContainer(String)
* @see org.springframework.jms.config.JmsListenerEndpointRegistry#getListenerContainer(String)
*/
String id() default "";

View File

@ -43,21 +43,21 @@ import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
/**
* Bean post-processor that registers methods annotated with @{@link JmsListener}
* Bean post-processor that registers methods annotated with {@link JmsListener}
* to be invoked by a JMS message listener container created under the cover
* by a {@link org.springframework.jms.config.JmsListenerContainerFactory} according
* to the parameters of the annotation.
*
* <p>Annotated methods can use flexible arguments as defined by @{@link JmsListener}.
* <p>Annotated methods can use flexible arguments as defined by {@link JmsListener}.
*
* <p>This post-processor is automatically registered by Spring's
* {@code <jms:annotation-driven>} XML element, and also by the @{@link EnableJms}
* {@code <jms:annotation-driven>} XML element, and also by the {@link EnableJms}
* annotation.
*
* <p>Auto-detect any {@link JmsListenerConfigurer} instances in the container,
* allowing for customization of the registry to be used, the default container
* factory or for fine-grained control over endpoints registration. See
* @{@link EnableJms} Javadoc for complete usage details.
* {@link EnableJms} Javadoc for complete usage details.
*
* @author Stephane Nicoll
* @since 4.1
@ -68,7 +68,6 @@ import org.springframework.util.StringUtils;
* @see JmsListenerEndpointRegistry
* @see org.springframework.jms.config.AbstractJmsListenerEndpoint
* @see MethodJmsListenerEndpoint
* @see MessageListenerFactory
*/
public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor, Ordered,
ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {
@ -78,9 +77,6 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor
*/
static final String DEFAULT_JMS_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "jmsListenerContainerFactory";
private final AtomicInteger counter = new AtomicInteger();
private ApplicationContext applicationContext;
private JmsListenerEndpointRegistry endpointRegistry;
@ -88,11 +84,16 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor
private final JmsHandlerMethodFactoryAdapter jmsHandlerMethodFactory = new JmsHandlerMethodFactoryAdapter();
private ApplicationContext applicationContext;
private final JmsListenerEndpointRegistrar registrar = new JmsListenerEndpointRegistrar();
private final AtomicInteger counter = new AtomicInteger();
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
public int getOrder() {
return LOWEST_PRECEDENCE;
}
/**
@ -105,8 +106,7 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor
/**
* Set the name of the {@link JmsListenerContainerFactory} to use by default.
* <p/>If none is specified, {@value #DEFAULT_JMS_LISTENER_CONTAINER_FACTORY_BEAN_NAME}
* is assumed to be defined.
* <p>If none is specified, "jmsListenerContainerFactory" is assumed to be defined.
*/
public void setContainerFactoryBeanName(String containerFactoryBeanName) {
this.containerFactoryBeanName = containerFactoryBeanName;
@ -125,10 +125,11 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor
}
@Override
public int getOrder() {
return LOWEST_PRECEDENCE;
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
@ -162,17 +163,17 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor
catch (NoSuchMethodException ex) {
throw new IllegalStateException(String.format(
"@JmsListener method '%s' found on bean target class '%s', " +
"but not found in any interface(s) for bean JDK proxy. Either " +
"pull the method up to an interface or switch to subclass (CGLIB) " +
"proxies by setting proxy-target-class/proxyTargetClass " +
"attribute to 'true'", method.getName(), method.getDeclaringClass().getSimpleName()));
"but not found in any interface(s) for bean JDK proxy. Either " +
"pull the method up to an interface or switch to subclass (CGLIB) " +
"proxies by setting proxy-target-class/proxyTargetClass " +
"attribute to 'true'", method.getName(), method.getDeclaringClass().getSimpleName()));
}
}
MethodJmsListenerEndpoint endpoint = new MethodJmsListenerEndpoint();
endpoint.setBean(bean);
endpoint.setMethod(method);
endpoint.setJmsHandlerMethodFactory(jmsHandlerMethodFactory);
endpoint.setJmsHandlerMethodFactory(this.jmsHandlerMethodFactory);
endpoint.setId(getEndpointId(jmsListener));
endpoint.setDestination(jmsListener.destination());
if (StringUtils.hasText(jmsListener.selector())) {
@ -189,12 +190,12 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor
String containerFactoryBeanName = jmsListener.containerFactory();
if (StringUtils.hasText(containerFactoryBeanName)) {
try {
factory = applicationContext.getBean(containerFactoryBeanName, JmsListenerContainerFactory.class);
factory = this.applicationContext.getBean(containerFactoryBeanName, JmsListenerContainerFactory.class);
}
catch (NoSuchBeanDefinitionException e) {
throw new BeanInitializationException("Could not register jms listener endpoint on ["
+ method + "], no " + JmsListenerContainerFactory.class.getSimpleName() + " with id '"
+ containerFactoryBeanName + "' was found in the application context", e);
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register jms listener endpoint on [" +
method + "], no " + JmsListenerContainerFactory.class.getSimpleName() + " with id '" +
containerFactoryBeanName + "' was found in the application context", ex);
}
}
@ -214,22 +215,20 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor
configurer.configureJmsListeners(registrar);
}
registrar.setApplicationContext(this.applicationContext);
this.registrar.setApplicationContext(this.applicationContext);
if (registrar.getEndpointRegistry() == null) {
if (endpointRegistry == null) {
endpointRegistry = applicationContext
.getBean(AnnotationConfigUtils.JMS_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
JmsListenerEndpointRegistry.class);
if (this.registrar.getEndpointRegistry() == null) {
if (this.endpointRegistry == null) {
this.endpointRegistry = this.applicationContext.getBean(
AnnotationConfigUtils.JMS_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME, JmsListenerEndpointRegistry.class);
}
registrar.setEndpointRegistry(endpointRegistry);
this.registrar.setEndpointRegistry(this.endpointRegistry);
}
if (this.containerFactoryBeanName != null) {
registrar.setContainerFactoryBeanName(this.containerFactoryBeanName);
this.registrar.setContainerFactoryBeanName(this.containerFactoryBeanName);
}
// Set the custom handler method factory once resolved by the configurer
JmsHandlerMethodFactory handlerMethodFactory = registrar.getJmsHandlerMethodFactory();
if (handlerMethodFactory != null) {
@ -238,10 +237,10 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor
// Create all the listeners and starts them
try {
registrar.afterPropertiesSet();
this.registrar.afterPropertiesSet();
}
catch (Exception e) {
throw new BeanInitializationException(e.getMessage(), e);
catch (Exception ex) {
throw new BeanInitializationException("Failed to initialize JmsListenerEndpointRegistrar", ex);
}
}
@ -250,11 +249,11 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor
return jmsListener.id();
}
else {
return "org.springframework.jms.JmsListenerEndpointContainer#"
+ counter.getAndIncrement();
return "org.springframework.jms.JmsListenerEndpointContainer#" + counter.getAndIncrement();
}
}
/**
* An {@link JmsHandlerMethodFactory} adapter that offers a configurable underlying
* instance to use. Useful if the factory to use is determined once the endpoints
@ -265,7 +264,7 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor
private JmsHandlerMethodFactory jmsHandlerMethodFactory;
private void setJmsHandlerMethodFactory(JmsHandlerMethodFactory jmsHandlerMethodFactory) {
public void setJmsHandlerMethodFactory(JmsHandlerMethodFactory jmsHandlerMethodFactory) {
this.jmsHandlerMethodFactory = jmsHandlerMethodFactory;
}
@ -275,10 +274,10 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor
}
private JmsHandlerMethodFactory getJmsHandlerMethodFactory() {
if (jmsHandlerMethodFactory == null) {
jmsHandlerMethodFactory = createDefaultJmsHandlerMethodFactory();
if (this.jmsHandlerMethodFactory == null) {
this.jmsHandlerMethodFactory = createDefaultJmsHandlerMethodFactory();
}
return jmsHandlerMethodFactory;
return this.jmsHandlerMethodFactory;
}
private JmsHandlerMethodFactory createDefaultJmsHandlerMethodFactory() {

View File

@ -16,7 +16,6 @@
package org.springframework.jms.config;
import javax.jms.ConnectionFactory;
import org.apache.commons.logging.Log;
@ -55,8 +54,13 @@ public abstract class AbstractJmsListenerContainerFactory<C extends AbstractMess
private Boolean subscriptionDurable;
private Boolean subscriptionShared;
private String clientId;
private Integer phase;
/**
* @see AbstractMessageListenerContainer#setConnectionFactory(ConnectionFactory)
*/
@ -113,6 +117,13 @@ public abstract class AbstractJmsListenerContainerFactory<C extends AbstractMess
this.subscriptionDurable = subscriptionDurable;
}
/**
* @see AbstractMessageListenerContainer#setSubscriptionShared(boolean)
*/
public void setSubscriptionShared(Boolean subscriptionShared) {
this.subscriptionShared = subscriptionShared;
}
/**
* @see AbstractMessageListenerContainer#setClientId(String)
*/
@ -121,12 +132,15 @@ public abstract class AbstractJmsListenerContainerFactory<C extends AbstractMess
}
/**
* Create an empty container instance.
* @see AbstractMessageListenerContainer#setPhase(int)
*/
protected abstract C createContainerInstance();
public void setPhase(int phase) {
this.phase = phase;
}
@Override
public C createMessageListenerContainer(JmsListenerEndpoint endpoint) {
public C createListenerContainer(JmsListenerEndpoint endpoint) {
C instance = createContainerInstance();
if (this.connectionFactory != null) {
@ -141,38 +155,45 @@ public abstract class AbstractJmsListenerContainerFactory<C extends AbstractMess
if (this.messageConverter != null) {
instance.setMessageConverter(this.messageConverter);
}
if (this.sessionTransacted != null) {
instance.setSessionTransacted(this.sessionTransacted);
}
if (this.sessionAcknowledgeMode != null) {
instance.setSessionAcknowledgeMode(this.sessionAcknowledgeMode);
}
if (this.pubSubDomain != null) {
instance.setPubSubDomain(this.pubSubDomain);
}
if (this.subscriptionDurable != null) {
instance.setSubscriptionDurable(this.subscriptionDurable);
}
if (this.subscriptionShared != null) {
instance.setSubscriptionShared(this.subscriptionShared);
}
if (this.clientId != null) {
instance.setClientId(this.clientId);
}
if (this.phase != null) {
instance.setPhase(this.phase);
}
endpoint.setupMessageContainer(instance);
endpoint.setupListenerContainer(instance);
initializeContainer(instance);
return instance;
}
/**
* Create an empty container instance.
*/
protected abstract C createContainerInstance();
/**
* Further initialize the specified container.
* <p>Subclasses can inherit from this method to apply extra
* configuration if necessary.
*/
protected void initializeContainer(C instance) {
}
}

View File

@ -112,60 +112,34 @@ public abstract class AbstractJmsListenerEndpoint implements JmsListenerEndpoint
* Return the concurrency for the listener, if any.
*/
public String getConcurrency() {
return concurrency;
return this.concurrency;
}
@Override
public void setupMessageContainer(MessageListenerContainer container) {
if (container instanceof AbstractMessageListenerContainer) { // JMS
setupJmsMessageContainer((AbstractMessageListenerContainer) container);
}
else if (container instanceof JmsMessageEndpointManager) { // JCA
setupJcaMessageContainer((JmsMessageEndpointManager) container);
public void setupListenerContainer(MessageListenerContainer listenerContainer) {
if (listenerContainer instanceof AbstractMessageListenerContainer) {
setupJmsListenerContainer((AbstractMessageListenerContainer) listenerContainer);
}
else {
throw new IllegalArgumentException("Could not configure endpoint with the specified container '" +
container + "' Only JMS (" + AbstractMessageListenerContainer.class.getName() +
" subclass) or JCA (" + JmsMessageEndpointManager.class.getName() + ") are supported.");
new JcaEndpointConfigurer().configureEndpoint(listenerContainer);
}
}
protected void setupJmsMessageContainer(AbstractMessageListenerContainer container) {
private void setupJmsListenerContainer(AbstractMessageListenerContainer listenerContainer) {
if (getDestination() != null) {
container.setDestinationName(getDestination());
listenerContainer.setDestinationName(getDestination());
}
if (getSubscription() != null) {
container.setDurableSubscriptionName(getSubscription());
listenerContainer.setSubscriptionName(getSubscription());
}
if (getSelector() != null) {
container.setMessageSelector(getSelector());
listenerContainer.setMessageSelector(getSelector());
}
if (getConcurrency() != null) {
container.setConcurrency(getConcurrency());
listenerContainer.setConcurrency(getConcurrency());
}
setupMessageListener(container);
}
protected void setupJcaMessageContainer(JmsMessageEndpointManager container) {
JmsActivationSpecConfig activationSpecConfig = container.getActivationSpecConfig();
if (activationSpecConfig == null) {
activationSpecConfig = new JmsActivationSpecConfig();
container.setActivationSpecConfig(activationSpecConfig);
}
if (getDestination() != null) {
activationSpecConfig.setDestinationName(getDestination());
}
if (getSubscription() != null) {
activationSpecConfig.setDurableSubscriptionName(getSubscription());
}
if (getSelector() != null) {
activationSpecConfig.setMessageSelector(getSelector());
}
if (getConcurrency() != null) {
activationSpecConfig.setConcurrency(getConcurrency());
}
setupMessageListener(container);
setupMessageListener(listenerContainer);
}
/**
@ -196,4 +170,43 @@ public abstract class AbstractJmsListenerEndpoint implements JmsListenerEndpoint
return getEndpointDescription().toString();
}
/**
* Inner class to avoid a hard dependency on the JCA API.
*/
private class JcaEndpointConfigurer {
public void configureEndpoint(Object listenerContainer) {
if (listenerContainer instanceof JmsMessageEndpointManager) {
setupJcaMessageContainer((JmsMessageEndpointManager) listenerContainer);
}
else {
throw new IllegalArgumentException("Could not configure endpoint with the specified container '" +
listenerContainer + "' Only JMS (" + AbstractMessageListenerContainer.class.getName() +
" subclass) or JCA (" + JmsMessageEndpointManager.class.getName() + ") are supported.");
}
}
private void setupJcaMessageContainer(JmsMessageEndpointManager container) {
JmsActivationSpecConfig activationSpecConfig = container.getActivationSpecConfig();
if (activationSpecConfig == null) {
activationSpecConfig = new JmsActivationSpecConfig();
container.setActivationSpecConfig(activationSpecConfig);
}
if (getDestination() != null) {
activationSpecConfig.setDestinationName(getDestination());
}
if (getSubscription() != null) {
activationSpecConfig.setSubscriptionName(getSubscription());
}
if (getSelector() != null) {
activationSpecConfig.setMessageSelector(getSelector());
}
if (getConcurrency() != null) {
activationSpecConfig.setConcurrency(getConcurrency());
}
setupMessageListener(container);
}
}
}

View File

@ -74,6 +74,10 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser {
protected static final String DESTINATION_TYPE_DURABLE_TOPIC = "durableTopic";
protected static final String DESTINATION_TYPE_SHARED_TOPIC = "sharedTopic";
protected static final String DESTINATION_TYPE_SHARED_DURABLE_TOPIC = "sharedDurableTopic";
protected static final String CLIENT_ID_ATTRIBUTE = "client-id";
protected static final String ACKNOWLEDGE_ATTRIBUTE = "acknowledge";
@ -98,14 +102,21 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser {
@Override
public BeanDefinition parse(Element element, ParserContext parserContext) {
CompositeComponentDefinition compositeDef =
new CompositeComponentDefinition(element.getTagName(), parserContext.extractSource(element));
new CompositeComponentDefinition(element.getTagName(), parserContext.extractSource(element));
parserContext.pushContainingComponent(compositeDef);
// First parse the common configuration of the container
PropertyValues containerValues = parseProperties(element, parserContext);
PropertyValues commonProperties = parseCommonContainerProperties(element, parserContext);
PropertyValues specificProperties = parseSpecificContainerProperties(element, parserContext);
// Expose the factory if requested
parseContainerFactory(element, parserContext, containerValues);
String factoryId = element.getAttribute(FACTORY_ID_ATTRIBUTE);
if (StringUtils.hasText(factoryId)) {
RootBeanDefinition beanDefinition = createContainerFactory(
factoryId, element, parserContext, commonProperties, specificProperties);
if (beanDefinition != null) {
beanDefinition.setSource(parserContext.extractSource(element));
parserContext.registerBeanComponent(new BeanComponentDefinition(beanDefinition, factoryId));
}
}
NodeList childNodes = element.getChildNodes();
for (int i = 0; i < childNodes.getLength(); i++) {
@ -113,9 +124,7 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser {
if (child.getNodeType() == Node.ELEMENT_NODE) {
String localName = parserContext.getDelegate().getLocalName(child);
if (LISTENER_ELEMENT.equals(localName)) {
ListenerContainerParserContext context = new ListenerContainerParserContext(
element, (Element) child, containerValues, parserContext);
parseListener(context);
parseListener(element, (Element) child, parserContext, commonProperties, specificProperties);
}
}
}
@ -124,45 +133,12 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser {
return null;
}
/**
* Parse the common properties for all listeners as defined by the specified
* container {@link Element}.
*/
protected abstract PropertyValues parseProperties(Element containerEle, ParserContext parserContext);
/**
* Create the {@link BeanDefinition} for the container factory using the specified
* shared property values.
*/
protected abstract RootBeanDefinition createContainerFactory(String factoryId,
Element containerElement, PropertyValues propertyValues);
/**
* Create the container {@link BeanDefinition} for the specified context.
*/
protected abstract BeanDefinition createContainer(ListenerContainerParserContext context);
private void parseContainerFactory(Element element, ParserContext parserContext, PropertyValues propertyValues) {
String factoryId = element.getAttribute(FACTORY_ID_ATTRIBUTE);
if (StringUtils.hasText(factoryId)) {
RootBeanDefinition beanDefinition = createContainerFactory(factoryId, element, propertyValues);
if (beanDefinition != null) {
beanDefinition.setSource(parserContext.extractSource(element));
parserContext.registerBeanComponent(new BeanComponentDefinition(beanDefinition, factoryId));
}
}
}
private void parseListener(ListenerContainerParserContext context) {
ParserContext parserContext = context.getParserContext();
PropertyValues containerValues = context.getContainerValues();
Element listenerEle = context.getListenerElement();
private void parseListener(Element containerEle, Element listenerEle, ParserContext parserContext,
PropertyValues commonContainerProperties, PropertyValues specificContainerProperties) {
RootBeanDefinition listenerDef = new RootBeanDefinition();
listenerDef.setSource(parserContext.extractSource(listenerEle));
BeanDefinition containerDef = createContainer(context);
listenerDef.setBeanClassName("org.springframework.jms.listener.adapter.MessageListenerAdapter");
String ref = listenerEle.getAttribute(REF_ATTRIBUTE);
if (!StringUtils.hasText(ref)) {
@ -183,15 +159,18 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser {
}
listenerDef.getPropertyValues().add("defaultListenerMethod", method);
PropertyValue messageConverterPv = getMessageConverter(containerValues);
PropertyValue messageConverterPv = commonContainerProperties.getPropertyValue("messageConverter");
if (messageConverterPv != null) {
listenerDef.getPropertyValues().addPropertyValue(messageConverterPv);
}
BeanDefinition containerDef = createContainer(
containerEle, listenerEle, parserContext, commonContainerProperties, specificContainerProperties);
containerDef.getPropertyValues().add("messageListener", listenerDef);
if (listenerEle.hasAttribute(RESPONSE_DESTINATION_ATTRIBUTE)) {
String responseDestination = listenerEle.getAttribute(RESPONSE_DESTINATION_ATTRIBUTE);
boolean pubSubDomain = indicatesPubSub(containerValues);
Boolean pubSubDomain = (Boolean) commonContainerProperties.getPropertyValue("pubSubDomain").getValue();
listenerDef.getPropertyValues().add(
pubSubDomain ? "defaultResponseTopicName" : "defaultResponseQueueName", responseDestination);
if (containerDef.getPropertyValues().contains("destinationResolver")) {
@ -200,9 +179,6 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser {
}
}
listenerDef.setBeanClassName("org.springframework.jms.listener.adapter.MessageListenerAdapter");
containerDef.getPropertyValues().add("messageListener", listenerDef);
String containerBeanName = listenerEle.getAttribute(ID_ATTRIBUTE);
// If no bean id is given auto generate one using the ReaderContext's BeanNameGenerator
@ -214,21 +190,13 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser {
parserContext.registerBeanComponent(new BeanComponentDefinition(containerDef, containerBeanName));
}
protected boolean indicatesPubSub(PropertyValues propertyValues) {
return false;
}
protected PropertyValue getMessageConverter(PropertyValues containerValues) {
return containerValues.getPropertyValue("messageConverter");
}
protected void parseListenerConfiguration(Element ele, ParserContext parserContext, BeanDefinition configDef) {
protected void parseListenerConfiguration(Element ele, ParserContext parserContext, MutablePropertyValues configValues) {
String destination = ele.getAttribute(DESTINATION_ATTRIBUTE);
if (!StringUtils.hasText(destination)) {
parserContext.getReaderContext().error(
"Listener 'destination' attribute contains empty value.", ele);
}
configDef.getPropertyValues().add("destinationName", destination);
configValues.add("destinationName", destination);
if (ele.hasAttribute(SUBSCRIPTION_ATTRIBUTE)) {
String subscription = ele.getAttribute(SUBSCRIPTION_ATTRIBUTE);
@ -236,7 +204,7 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser {
parserContext.getReaderContext().error(
"Listener 'subscription' attribute contains empty value.", ele);
}
configDef.getPropertyValues().add("durableSubscriptionName", subscription);
configValues.add("subscriptionName", subscription);
}
if (ele.hasAttribute(SELECTOR_ATTRIBUTE)) {
@ -245,7 +213,7 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser {
parserContext.getReaderContext().error(
"Listener 'selector' attribute contains empty value.", ele);
}
configDef.getPropertyValues().add("messageSelector", selector);
configValues.add("messageSelector", selector);
}
if (ele.hasAttribute(CONCURRENCY_ATTRIBUTE)) {
@ -254,17 +222,27 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser {
parserContext.getReaderContext().error(
"Listener 'concurrency' attribute contains empty value.", ele);
}
configDef.getPropertyValues().add("concurrency", concurrency);
configValues.add("concurrency", concurrency);
}
}
protected PropertyValues parseCommonContainerProperties(Element ele, ParserContext parserContext) {
MutablePropertyValues propertyValues = new MutablePropertyValues();
protected MutablePropertyValues parseCommonContainerProperties(Element containerEle, ParserContext parserContext) {
MutablePropertyValues properties = new MutablePropertyValues();
String destinationType = ele.getAttribute(DESTINATION_TYPE_ATTRIBUTE);
String destinationType = containerEle.getAttribute(DESTINATION_TYPE_ATTRIBUTE);
boolean pubSubDomain = false;
boolean subscriptionDurable = false;
if (DESTINATION_TYPE_DURABLE_TOPIC.equals(destinationType)) {
boolean subscriptionShared = false;
if (DESTINATION_TYPE_SHARED_DURABLE_TOPIC.equals(destinationType)) {
pubSubDomain = true;
subscriptionDurable = true;
subscriptionShared = true;
}
else if (DESTINATION_TYPE_SHARED_TOPIC.equals(destinationType)) {
pubSubDomain = true;
subscriptionShared = true;
}
else if (DESTINATION_TYPE_DURABLE_TOPIC.equals(destinationType)) {
pubSubDomain = true;
subscriptionDurable = true;
}
@ -275,23 +253,57 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser {
// the default: queue
}
else {
parserContext.getReaderContext().error("Invalid listener container 'destination-type': " +
"only \"queue\", \"topic\" and \"durableTopic\" supported.", ele);
parserContext.getReaderContext().error("Invalid listener container 'destination-type': only " +
"\"queue\", \"topic\", \"durableTopic\", \"sharedTopic\", \"sharedDurableTopic\" supported.", containerEle);
}
propertyValues.add("pubSubDomain", pubSubDomain);
propertyValues.add("subscriptionDurable", subscriptionDurable);
properties.add("pubSubDomain", pubSubDomain);
properties.add("subscriptionDurable", subscriptionDurable);
properties.add("subscriptionShared", subscriptionShared);
if (ele.hasAttribute(CLIENT_ID_ATTRIBUTE)) {
String clientId = ele.getAttribute(CLIENT_ID_ATTRIBUTE);
if (containerEle.hasAttribute(CLIENT_ID_ATTRIBUTE)) {
String clientId = containerEle.getAttribute(CLIENT_ID_ATTRIBUTE);
if (!StringUtils.hasText(clientId)) {
parserContext.getReaderContext().error(
"Listener 'client-id' attribute contains empty value.", ele);
"Listener 'client-id' attribute contains empty value.", containerEle);
}
propertyValues.add("clientId", clientId);
properties.add("clientId", clientId);
}
return propertyValues;
if (containerEle.hasAttribute(MESSAGE_CONVERTER_ATTRIBUTE)) {
String messageConverter = containerEle.getAttribute(MESSAGE_CONVERTER_ATTRIBUTE);
if (!StringUtils.hasText(messageConverter)) {
parserContext.getReaderContext().error(
"listener container 'message-converter' attribute contains empty value.", containerEle);
}
else {
properties.add("messageConverter", new RuntimeBeanReference(messageConverter));
}
}
return properties;
}
/**
* Parse the common properties for all listeners as defined by the specified
* container {@link Element}.
*/
protected abstract MutablePropertyValues parseSpecificContainerProperties(Element containerEle, ParserContext parserContext);
/**
* Create the {@link BeanDefinition} for the container factory using the specified
* shared property values.
*/
protected abstract RootBeanDefinition createContainerFactory(String factoryId, Element containerEle, ParserContext parserContext,
PropertyValues commonContainerProperties, PropertyValues specificContainerProperties);
/**
* Create the container {@link BeanDefinition} for the specified context.
*/
protected abstract RootBeanDefinition createContainer(Element containerEle, Element listenerEle, ParserContext parserContext,
PropertyValues commonContainerProperties, PropertyValues specificContainerProperties);
protected Integer parseAcknowledgeMode(Element ele, ParserContext parserContext) {
String acknowledge = ele.getAttribute(ACKNOWLEDGE_ATTRIBUTE);
if (StringUtils.hasText(acknowledge)) {
@ -316,45 +328,4 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser {
}
}
protected boolean indicatesPubSubConfig(PropertyValues configuration) {
return (Boolean) configuration.getPropertyValue("pubSubDomain").getValue();
}
protected static class ListenerContainerParserContext {
private final Element containerElement;
private final Element listenerElement;
private final PropertyValues containerValues;
private final ParserContext parserContext;
public ListenerContainerParserContext(Element containerElement, Element listenerElement,
PropertyValues containerValues, ParserContext parserContext) {
this.containerElement = containerElement;
this.listenerElement = listenerElement;
this.containerValues = containerValues;
this.parserContext = parserContext;
}
public Element getContainerElement() {
return containerElement;
}
public Element getListenerElement() {
return listenerElement;
}
public PropertyValues getContainerValues() {
return containerValues;
}
public ParserContext getParserContext() {
return parserContext;
}
public Object getSource() {
return parserContext.extractSource(containerElement);
}
}
}

View File

@ -18,7 +18,6 @@ package org.springframework.jms.config;
import org.w3c.dom.Element;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
import org.springframework.beans.factory.parsing.BeanComponentDefinition;
@ -36,7 +35,7 @@ import org.springframework.util.StringUtils;
* @author Stephane Nicoll
* @since 4.1
*/
final class AnnotationDrivenJmsBeanDefinitionParser implements BeanDefinitionParser {
class AnnotationDrivenJmsBeanDefinitionParser implements BeanDefinitionParser {
@Override
public BeanDefinition parse(Element element, ParserContext parserContext) {
@ -89,7 +88,6 @@ final class AnnotationDrivenJmsBeanDefinitionParser implements BeanDefinitionPar
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(
"org.springframework.jms.config.JmsListenerEndpointRegistry");
builder.getRawBeanDefinition().setSource(source);
builder.setAutowireMode(AutowireCapableBeanFactory.AUTOWIRE_BY_TYPE);
registerInfrastructureBean(parserContext, builder, AnnotationConfigUtils.JMS_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME);
}

View File

@ -25,12 +25,13 @@ import org.springframework.jms.support.destination.DestinationResolver;
/**
* A {@link JmsListenerContainerFactory} implementation to build a
* JCA {@link JmsMessageEndpointManager}.
* JCA-based {@link JmsMessageEndpointManager}.
*
* @author Stephane Nicoll
* @since 4.1
*/
public class DefaultJcaListenerContainerFactory implements JmsListenerContainerFactory<JmsMessageEndpointManager> {
public class DefaultJcaListenerContainerFactory extends JmsActivationSpecConfig
implements JmsListenerContainerFactory<JmsMessageEndpointManager> {
private ResourceAdapter resourceAdapter;
@ -40,7 +41,8 @@ public class DefaultJcaListenerContainerFactory implements JmsListenerContainerF
private Object transactionManager;
private JmsActivationSpecConfig activationSpecConfig;
private Integer phase;
/**
* @see JmsMessageEndpointManager#setResourceAdapter(ResourceAdapter)
@ -71,28 +73,15 @@ public class DefaultJcaListenerContainerFactory implements JmsListenerContainerF
}
/**
* @see JmsMessageEndpointManager#setActivationSpecConfig(JmsActivationSpecConfig)
* @see JmsMessageEndpointManager#setPhase(int)
*/
public void setActivationSpecConfig(JmsActivationSpecConfig activationSpecConfig) {
this.activationSpecConfig = activationSpecConfig;
public void setPhase(int phase) {
this.phase = phase;
}
/**
* Return the current {@link JmsActivationSpecConfig}.
*/
public JmsActivationSpecConfig getActivationSpecConfig() {
return activationSpecConfig;
}
/**
* Create an empty container instance.
*/
protected JmsMessageEndpointManager createContainerInstance() {
return new JmsMessageEndpointManager();
}
@Override
public JmsMessageEndpointManager createMessageListenerContainer(JmsListenerEndpoint endpoint) {
public JmsMessageEndpointManager createListenerContainer(JmsListenerEndpoint endpoint) {
if (this.destinationResolver != null && this.activationSpecFactory != null) {
throw new IllegalStateException("Specify either 'activationSpecFactory' or " +
"'destinationResolver', not both. If you define a dedicated JmsActivationSpecFactory bean, " +
@ -113,13 +102,21 @@ public class DefaultJcaListenerContainerFactory implements JmsListenerContainerF
if (this.transactionManager != null) {
instance.setTransactionManager(this.transactionManager);
}
if (this.activationSpecConfig != null) {
instance.setActivationSpecConfig(this.activationSpecConfig);
if (this.phase != null) {
instance.setPhase(this.phase);
}
endpoint.setupMessageContainer(instance);
instance.setActivationSpecConfig(this);
endpoint.setupListenerContainer(instance);
return instance;
}
/**
* Create an empty container instance.
*/
protected JmsMessageEndpointManager createContainerInstance() {
return new JmsMessageEndpointManager();
}
}

View File

@ -23,12 +23,11 @@ import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.backoff.BackOff;
/**
* A {@link JmsListenerContainerFactory} implementation to build regular
* A {@link JmsListenerContainerFactory} implementation to build a regular
* {@link DefaultMessageListenerContainer}.
*
* <p>This should be the default for most users and a good transition
* paths for those that are used to build such container definition
* manually.
* <p>This should be the default for most users and a good transition paths
* for those that are used to build such container definition manually.
*
* @author Stephane Nicoll
* @since 4.1

View File

@ -19,9 +19,7 @@ package org.springframework.jms.config;
import org.w3c.dom.Element;
import org.springframework.beans.MutablePropertyValues;
import org.springframework.beans.PropertyValue;
import org.springframework.beans.PropertyValues;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.RuntimeBeanReference;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.beans.factory.xml.ParserContext;
@ -42,83 +40,64 @@ class JcaListenerContainerParser extends AbstractListenerContainerParser {
@Override
protected PropertyValues parseProperties(Element containerEle, ParserContext parserContext) {
final MutablePropertyValues properties = new MutablePropertyValues();
PropertyValues containerValues = parseContainerProperties(containerEle, parserContext);
protected RootBeanDefinition createContainerFactory(String factoryId, Element containerEle, ParserContext parserContext,
PropertyValues commonContainerProperties, PropertyValues specificContainerProperties) {
// Common values are added to the activationSpecConfig
PropertyValues commonValues = parseCommonContainerProperties(containerEle, parserContext);
BeanDefinition beanDefinition = getActivationSpecConfigBeanDefinition(containerValues);
beanDefinition.getPropertyValues().addPropertyValues(commonValues);
RootBeanDefinition factoryDef = new RootBeanDefinition();
factoryDef.setBeanClassName("org.springframework.jms.config.DefaultJcaListenerContainerFactory");
properties.addPropertyValues(containerValues);
return properties;
factoryDef.getPropertyValues().addPropertyValues(commonContainerProperties);
factoryDef.getPropertyValues().addPropertyValues(specificContainerProperties);
return factoryDef;
}
@Override
protected RootBeanDefinition createContainerFactory(String factoryId,
Element containerElement, PropertyValues propertyValues) {
RootBeanDefinition beanDefinition = new RootBeanDefinition();
beanDefinition.setBeanClassName("org.springframework.jms.config.DefaultJcaListenerContainerFactory");
beanDefinition.getPropertyValues().addPropertyValues(propertyValues);
return beanDefinition;
}
protected RootBeanDefinition createContainer(Element containerEle, Element listenerEle, ParserContext parserContext,
PropertyValues commonContainerProperties, PropertyValues specificContainerProperties) {
@Override
protected BeanDefinition createContainer(ListenerContainerParserContext context) {
RootBeanDefinition containerDef = new RootBeanDefinition();
containerDef.setSource(context.getSource());
containerDef.setSource(parserContext.extractSource(containerEle));
containerDef.setBeanClassName("org.springframework.jms.listener.endpoint.JmsMessageEndpointManager");
containerDef.getPropertyValues().addPropertyValues(specificContainerProperties);
applyContainerValues(context, containerDef);
RootBeanDefinition configDef = new RootBeanDefinition();
configDef.setSource(parserContext.extractSource(containerEle));
configDef.setBeanClassName("org.springframework.jms.listener.endpoint.JmsActivationSpecConfig");
configDef.getPropertyValues().addPropertyValues(commonContainerProperties);
parseListenerConfiguration(listenerEle, parserContext, configDef.getPropertyValues());
BeanDefinition activationSpec = getActivationSpecConfigBeanDefinition(containerDef.getPropertyValues());
parseListenerConfiguration(context.getListenerElement(), context.getParserContext(), activationSpec);
String phase = context.getContainerElement().getAttribute(PHASE_ATTRIBUTE);
if (StringUtils.hasText(phase)) {
containerDef.getPropertyValues().add("phase", phase);
}
containerDef.getPropertyValues().add("activationSpecConfig", configDef);
return containerDef;
}
/**
* The property values provided by the factory element contains a mutable property (the
* activation spec config). To avoid changing the bean definition from the parent, a clone
* bean definition is created for the container being configured.
*/
private void applyContainerValues(ListenerContainerParserContext context, RootBeanDefinition containerDef) {
// Apply settings from the container
containerDef.getPropertyValues().addPropertyValues(context.getContainerValues());
@Override
protected MutablePropertyValues parseCommonContainerProperties(Element containerEle, ParserContext parserContext) {
MutablePropertyValues properties = super.parseCommonContainerProperties(containerEle, parserContext);
// Clone the activationSpecConfig property value as it is mutable
PropertyValue pv = containerDef.getPropertyValues().getPropertyValue("activationSpecConfig");
RootBeanDefinition activationSpecConfig = new RootBeanDefinition((RootBeanDefinition) pv.getValue());
containerDef.getPropertyValues().add("activationSpecConfig", activationSpecConfig);
Integer acknowledgeMode = parseAcknowledgeMode(containerEle, parserContext);
if (acknowledgeMode != null) {
properties.add("acknowledgeMode", acknowledgeMode);
}
String concurrency = containerEle.getAttribute(CONCURRENCY_ATTRIBUTE);
if (StringUtils.hasText(concurrency)) {
properties.add("concurrency", concurrency);
}
String prefetch = containerEle.getAttribute(PREFETCH_ATTRIBUTE);
if (StringUtils.hasText(prefetch)) {
properties.add("prefetchSize", new Integer(prefetch));
}
return properties;
}
@Override
protected boolean indicatesPubSub(PropertyValues propertyValues) {
BeanDefinition configDef = getActivationSpecConfigBeanDefinition(propertyValues);
return indicatesPubSubConfig(configDef.getPropertyValues());
}
protected MutablePropertyValues parseSpecificContainerProperties(Element containerEle, ParserContext parserContext) {
MutablePropertyValues properties = new MutablePropertyValues();
@Override
protected PropertyValue getMessageConverter(PropertyValues containerValues) {
BeanDefinition configDef = getActivationSpecConfigBeanDefinition(containerValues);
return super.getMessageConverter(configDef.getPropertyValues());
}
private BeanDefinition getActivationSpecConfigBeanDefinition(PropertyValues containerValues) {
PropertyValue activationSpecConfig = containerValues.getPropertyValue("activationSpecConfig");
return (BeanDefinition) activationSpecConfig.getValue();
}
private PropertyValues parseContainerProperties(Element containerEle,
ParserContext parserContext) {
MutablePropertyValues propertyValues = new MutablePropertyValues();
if (containerEle.hasAttribute(RESOURCE_ADAPTER_ATTRIBUTE)) {
String resourceAdapterBeanName = containerEle.getAttribute(RESOURCE_ADAPTER_ATTRIBUTE);
if (!StringUtils.hasText(resourceAdapterBeanName)) {
@ -126,8 +105,7 @@ class JcaListenerContainerParser extends AbstractListenerContainerParser {
"Listener container 'resource-adapter' attribute contains empty value.", containerEle);
}
else {
propertyValues.add("resourceAdapter",
new RuntimeBeanReference(resourceAdapterBeanName));
properties.add("resourceAdapter", new RuntimeBeanReference(resourceAdapterBeanName));
}
}
@ -139,54 +117,23 @@ class JcaListenerContainerParser extends AbstractListenerContainerParser {
"'destination-resolver', not both. If you define a dedicated JmsActivationSpecFactory bean, " +
"specify the custom DestinationResolver there (if possible).", containerEle);
}
propertyValues.add("activationSpecFactory",
new RuntimeBeanReference(activationSpecFactoryBeanName));
properties.add("activationSpecFactory", new RuntimeBeanReference(activationSpecFactoryBeanName));
}
if (StringUtils.hasText(destinationResolverBeanName)) {
propertyValues.add("destinationResolver",
new RuntimeBeanReference(destinationResolverBeanName));
properties.add("destinationResolver", new RuntimeBeanReference(destinationResolverBeanName));
}
String transactionManagerBeanName = containerEle.getAttribute(TRANSACTION_MANAGER_ATTRIBUTE);
if (StringUtils.hasText(transactionManagerBeanName)) {
propertyValues.add("transactionManager",
new RuntimeBeanReference(transactionManagerBeanName));
properties.add("transactionManager", new RuntimeBeanReference(transactionManagerBeanName));
}
RootBeanDefinition configDef = new RootBeanDefinition();
configDef.setSource(parserContext.extractSource(configDef));
configDef.setBeanClassName("org.springframework.jms.listener.endpoint.JmsActivationSpecConfig");
Integer acknowledgeMode = parseAcknowledgeMode(containerEle, parserContext);
if (acknowledgeMode != null) {
configDef.getPropertyValues().add("acknowledgeMode", acknowledgeMode);
}
String concurrency = containerEle.getAttribute(CONCURRENCY_ATTRIBUTE);
if (StringUtils.hasText(concurrency)) {
configDef.getPropertyValues().add("concurrency", concurrency);
String phase = containerEle.getAttribute(PHASE_ATTRIBUTE);
if (StringUtils.hasText(phase)) {
properties.add("phase", phase);
}
String prefetch = containerEle.getAttribute(PREFETCH_ATTRIBUTE);
if (StringUtils.hasText(prefetch)) {
configDef.getPropertyValues().add("prefetchSize", new Integer(prefetch));
}
if (containerEle.hasAttribute(MESSAGE_CONVERTER_ATTRIBUTE)) {
String messageConverter = containerEle.getAttribute(MESSAGE_CONVERTER_ATTRIBUTE);
if (!StringUtils.hasText(messageConverter)) {
parserContext.getReaderContext().error(
"listener container 'message-converter' attribute contains empty value.", containerEle);
}
else {
configDef.getPropertyValues().add("messageConverter",
new RuntimeBeanReference(messageConverter));
}
}
propertyValues.add("activationSpecConfig", configDef);
return propertyValues;
return properties;
}
}

View File

@ -33,6 +33,6 @@ public interface JmsListenerContainerFactory<C extends MessageListenerContainer>
* @param endpoint the endpoint to configure
* @return the created container
*/
C createMessageListenerContainer(JmsListenerEndpoint endpoint);
C createListenerContainer(JmsListenerEndpoint endpoint);
}

View File

@ -22,7 +22,6 @@ import org.w3c.dom.Element;
import org.springframework.beans.MutablePropertyValues;
import org.springframework.beans.PropertyValues;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.RuntimeBeanReference;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.beans.factory.xml.ParserContext;
@ -57,43 +56,39 @@ class JmsListenerContainerParser extends AbstractListenerContainerParser {
private static final String BACK_OFF_ATTRIBUTE = "back-off";
protected PropertyValues parseProperties(Element containerEle, ParserContext parserContext) {
final MutablePropertyValues properties = new MutablePropertyValues();
PropertyValues commonValues = parseCommonContainerProperties(containerEle, parserContext);
PropertyValues containerValues = parseContainerProperties(containerEle,
parserContext, isSimpleContainer(containerEle));
properties.addPropertyValues(commonValues);
properties.addPropertyValues(containerValues);
return properties;
}
@Override
protected RootBeanDefinition createContainerFactory(String factoryId, Element containerEle, PropertyValues propertyValues) {
RootBeanDefinition beanDefinition = new RootBeanDefinition();
protected RootBeanDefinition createContainerFactory(String factoryId, Element containerEle, ParserContext parserContext,
PropertyValues commonContainerProperties, PropertyValues specificContainerProperties) {
RootBeanDefinition factoryDef = new RootBeanDefinition();
String containerType = containerEle.getAttribute(CONTAINER_TYPE_ATTRIBUTE);
String containerClass = containerEle.getAttribute(CONTAINER_CLASS_ATTRIBUTE);
if (!"".equals(containerClass)) {
return null; // Not supported
}
else if ("".equals(containerType) || containerType.startsWith("default")) {
beanDefinition.setBeanClassName("org.springframework.jms.config.DefaultJmsListenerContainerFactory");
factoryDef.setBeanClassName("org.springframework.jms.config.DefaultJmsListenerContainerFactory");
}
else if (containerType.startsWith("simple")) {
beanDefinition.setBeanClassName("org.springframework.jms.config.SimpleJmsListenerContainerFactory");
factoryDef.setBeanClassName("org.springframework.jms.config.SimpleJmsListenerContainerFactory");
}
beanDefinition.getPropertyValues().addPropertyValues(propertyValues);
return beanDefinition;
factoryDef.getPropertyValues().addPropertyValues(commonContainerProperties);
factoryDef.getPropertyValues().addPropertyValues(specificContainerProperties);
return factoryDef;
}
@Override
protected BeanDefinition createContainer(ListenerContainerParserContext context) {
protected RootBeanDefinition createContainer(Element containerEle, Element listenerEle, ParserContext parserContext,
PropertyValues commonContainerProperties, PropertyValues specificContainerProperties) {
RootBeanDefinition containerDef = new RootBeanDefinition();
containerDef.setSource(context.getSource());
containerDef.setSource(parserContext.extractSource(containerEle));
containerDef.getPropertyValues().addPropertyValues(commonContainerProperties);
containerDef.getPropertyValues().addPropertyValues(specificContainerProperties);
// Set all container values
containerDef.getPropertyValues().addPropertyValues(context.getContainerValues());
Element containerEle = context.getContainerElement();
String containerType = containerEle.getAttribute(CONTAINER_TYPE_ATTRIBUTE);
String containerClass = containerEle.getAttribute(CONTAINER_CLASS_ATTRIBUTE);
if (!"".equals(containerClass)) {
@ -106,29 +101,22 @@ class JmsListenerContainerParser extends AbstractListenerContainerParser {
containerDef.setBeanClassName("org.springframework.jms.listener.SimpleMessageListenerContainer");
}
else {
context.getParserContext().getReaderContext().error(
parserContext.getReaderContext().error(
"Invalid 'container-type' attribute: only \"default\" and \"simple\" supported.", containerEle);
}
String phase = containerEle.getAttribute(PHASE_ATTRIBUTE);
if (StringUtils.hasText(phase)) {
containerDef.getPropertyValues().add("phase", phase);
}
// Parse listener specific settings
parseListenerConfiguration(context.getListenerElement(), context.getParserContext(), containerDef);
parseListenerConfiguration(listenerEle, parserContext, containerDef.getPropertyValues());
return containerDef;
}
@Override
protected boolean indicatesPubSub(PropertyValues propertyValues) {
return indicatesPubSubConfig(propertyValues);
}
protected MutablePropertyValues parseSpecificContainerProperties(Element containerEle, ParserContext parserContext) {
MutablePropertyValues properties = new MutablePropertyValues();
boolean isSimpleContainer = containerEle.getAttribute(CONTAINER_TYPE_ATTRIBUTE).startsWith("simple");
private PropertyValues parseContainerProperties(Element containerEle,
ParserContext parserContext, boolean isSimpleContainer) {
MutablePropertyValues propertyValues = new MutablePropertyValues();
String connectionFactoryBeanName = "connectionFactory";
if (containerEle.hasAttribute(CONNECTION_FACTORY_ATTRIBUTE)) {
connectionFactoryBeanName = containerEle.getAttribute(CONNECTION_FACTORY_ATTRIBUTE);
@ -138,38 +126,22 @@ class JmsListenerContainerParser extends AbstractListenerContainerParser {
}
}
if (StringUtils.hasText(connectionFactoryBeanName)) {
propertyValues.add("connectionFactory",
new RuntimeBeanReference(connectionFactoryBeanName));
properties.add("connectionFactory", new RuntimeBeanReference(connectionFactoryBeanName));
}
String taskExecutorBeanName = containerEle.getAttribute(TASK_EXECUTOR_ATTRIBUTE);
if (StringUtils.hasText(taskExecutorBeanName)) {
propertyValues.add("taskExecutor",
new RuntimeBeanReference(taskExecutorBeanName));
properties.add("taskExecutor", new RuntimeBeanReference(taskExecutorBeanName));
}
String errorHandlerBeanName = containerEle.getAttribute(ERROR_HANDLER_ATTRIBUTE);
if (StringUtils.hasText(errorHandlerBeanName)) {
propertyValues.add("errorHandler",
new RuntimeBeanReference(errorHandlerBeanName));
}
if (containerEle.hasAttribute(MESSAGE_CONVERTER_ATTRIBUTE)) {
String messageConverter = containerEle.getAttribute(MESSAGE_CONVERTER_ATTRIBUTE);
if (!StringUtils.hasText(messageConverter)) {
parserContext.getReaderContext().error(
"listener container 'message-converter' attribute contains empty value.", containerEle);
}
else {
propertyValues.add("messageConverter",
new RuntimeBeanReference(messageConverter));
}
properties.add("errorHandler", new RuntimeBeanReference(errorHandlerBeanName));
}
String destinationResolverBeanName = containerEle.getAttribute(DESTINATION_RESOLVER_ATTRIBUTE);
if (StringUtils.hasText(destinationResolverBeanName)) {
propertyValues.add("destinationResolver",
new RuntimeBeanReference(destinationResolverBeanName));
properties.add("destinationResolver", new RuntimeBeanReference(destinationResolverBeanName));
}
String cache = containerEle.getAttribute(CACHE_ATTRIBUTE);
@ -182,17 +154,17 @@ class JmsListenerContainerParser extends AbstractListenerContainerParser {
}
}
else {
propertyValues.add("cacheLevelName", "CACHE_" + cache.toUpperCase());
properties.add("cacheLevelName", "CACHE_" + cache.toUpperCase());
}
}
Integer acknowledgeMode = parseAcknowledgeMode(containerEle, parserContext);
if (acknowledgeMode != null) {
if (acknowledgeMode == Session.SESSION_TRANSACTED) {
propertyValues.add("sessionTransacted", Boolean.TRUE);
properties.add("sessionTransacted", Boolean.TRUE);
}
else {
propertyValues.add("sessionAcknowledgeMode", acknowledgeMode);
properties.add("sessionAcknowledgeMode", acknowledgeMode);
}
}
@ -203,51 +175,50 @@ class JmsListenerContainerParser extends AbstractListenerContainerParser {
"'transaction-manager' attribute not supported for listener container of type \"simple\".", containerEle);
}
else {
propertyValues.add("transactionManager",
new RuntimeBeanReference(transactionManagerBeanName));
properties.add("transactionManager", new RuntimeBeanReference(transactionManagerBeanName));
}
}
String concurrency = containerEle.getAttribute(CONCURRENCY_ATTRIBUTE);
if (StringUtils.hasText(concurrency)) {
propertyValues.add("concurrency", concurrency);
properties.add("concurrency", concurrency);
}
String prefetch = containerEle.getAttribute(PREFETCH_ATTRIBUTE);
if (StringUtils.hasText(prefetch)) {
if (!isSimpleContainer) {
propertyValues.add("maxMessagesPerTask", prefetch);
properties.add("maxMessagesPerTask", prefetch);
}
}
String phase = containerEle.getAttribute(PHASE_ATTRIBUTE);
if (StringUtils.hasText(phase)) {
properties.add("phase", phase);
}
String receiveTimeout = containerEle.getAttribute(RECEIVE_TIMEOUT_ATTRIBUTE);
if (StringUtils.hasText(receiveTimeout)) {
if (!isSimpleContainer) {
propertyValues.add("receiveTimeout", receiveTimeout);
properties.add("receiveTimeout", receiveTimeout);
}
}
String backOffBeanName = containerEle.getAttribute(BACK_OFF_ATTRIBUTE);
if (StringUtils.hasText(backOffBeanName)) {
if (!isSimpleContainer) {
propertyValues.add("backOff", new RuntimeBeanReference(backOffBeanName));
properties.add("backOff", new RuntimeBeanReference(backOffBeanName));
}
}
else { // No need to consider this if back-off is set
String recoveryInterval = containerEle.getAttribute(RECOVERY_INTERVAL_ATTRIBUTE);
if (StringUtils.hasText(recoveryInterval)) {
if (!isSimpleContainer) {
propertyValues.add("recoveryInterval", recoveryInterval);
properties.add("recoveryInterval", recoveryInterval);
}
}
}
return propertyValues;
}
private boolean isSimpleContainer(Element containerEle) {
String containerType = containerEle.getAttribute(CONTAINER_TYPE_ATTRIBUTE);
return containerType.startsWith("simple");
return properties;
}
}

View File

@ -41,8 +41,8 @@ public interface JmsListenerEndpoint {
* setting the {@code destination} and the {@code messageListener} to
* use but an implementation may override any default setting that
* was already set.
* @param container the container to configure
* @param listenerContainer the listener container to configure
*/
void setupMessageContainer(MessageListenerContainer container);
void setupListenerContainer(MessageListenerContainer listenerContainer);
}

View File

@ -44,8 +44,8 @@ public class JmsListenerEndpointRegistrar implements ApplicationContextAware, In
private ApplicationContext applicationContext;
private final List<JmsListenerEndpointDescriptor> endpointDescriptors
= new ArrayList<JmsListenerEndpointDescriptor>();
private final List<JmsListenerEndpointDescriptor> endpointDescriptors =
new ArrayList<JmsListenerEndpointDescriptor>();
/**
* Set the {@link JmsListenerEndpointRegistry} instance to use.
@ -73,11 +73,10 @@ public class JmsListenerEndpointRegistrar implements ApplicationContextAware, In
}
/**
* Set the {@link JmsListenerContainerFactory} to use in case a
* {@link JmsListenerEndpoint} is registered with a {@code null} container
* factory.
* <p>Alternatively, the bean name of the {@link JmsListenerContainerFactory}
* to use can be specified for a lazy lookup, see {@see #setContainerFactoryBeanName}
* Set the {@link JmsListenerContainerFactory} to use in case a {@link JmsListenerEndpoint}
* is registered with a {@code null} container factory.
* <p>Alternatively, the bean name of the {@link JmsListenerContainerFactory} to use
* can be specified for a lazy lookup, see {@link #setContainerFactoryBeanName}.
*/
public void setContainerFactory(JmsListenerContainerFactory<?> containerFactory) {
this.containerFactory = containerFactory;
@ -108,10 +107,10 @@ public class JmsListenerEndpointRegistrar implements ApplicationContextAware, In
}
/**
* Register a new {@link JmsListenerEndpoint} alongside the {@link JmsListenerContainerFactory}
* to use to create the underlying container.
* <p>The {@code factory} may be {@code null} if the default factory has to be used for that
* endpoint.
* Register a new {@link JmsListenerEndpoint} alongside the
* {@link JmsListenerContainerFactory} to use to create the underlying container.
* <p>The {@code factory} may be {@code null} if the default factory has to be
* used for that endpoint.
*/
public void registerEndpoint(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory) {
Assert.notNull(endpoint, "Endpoint must be set");
@ -121,9 +120,8 @@ public class JmsListenerEndpointRegistrar implements ApplicationContextAware, In
}
/**
* Register a new {@link JmsListenerEndpoint} using the default {@link JmsListenerContainerFactory}
* to create the underlying container.
*
* Register a new {@link JmsListenerEndpoint} using the default
* {@link JmsListenerContainerFactory} to create the underlying container.
* @see #setContainerFactory(JmsListenerContainerFactory)
* @see #registerEndpoint(JmsListenerEndpoint, JmsListenerContainerFactory)
*/
@ -139,7 +137,7 @@ public class JmsListenerEndpointRegistrar implements ApplicationContextAware, In
protected void startAllEndpoints() throws Exception {
for (JmsListenerEndpointDescriptor descriptor : endpointDescriptors) {
endpointRegistry.createJmsListenerContainer(
endpointRegistry.registerListenerContainer(
descriptor.endpoint, resolveContainerFactory(descriptor));
}
}
@ -152,25 +150,25 @@ public class JmsListenerEndpointRegistrar implements ApplicationContextAware, In
return this.containerFactory;
}
else if (this.containerFactoryBeanName != null) {
this.containerFactory = applicationContext.getBean(
this.containerFactory = this.applicationContext.getBean(
this.containerFactoryBeanName, JmsListenerContainerFactory.class);
return this.containerFactory; // Consider changing this if live change of the factory is required
return this.containerFactory; // Consider changing this if live change of the factory is required
}
else {
throw new IllegalStateException("Could not resolve the "
+ JmsListenerContainerFactory.class.getSimpleName() + " to use for ["
+ descriptor.endpoint + "] no factory was given and no default is set.");
throw new IllegalStateException("Could not resolve the " +
JmsListenerContainerFactory.class.getSimpleName() + " to use for [" +
descriptor.endpoint + "] no factory was given and no default is set.");
}
}
private static class JmsListenerEndpointDescriptor {
private final JmsListenerEndpoint endpoint;
private final JmsListenerContainerFactory<?> containerFactory;
public final JmsListenerEndpoint endpoint;
private JmsListenerEndpointDescriptor(JmsListenerEndpoint endpoint,
JmsListenerContainerFactory<?> containerFactory) {
public final JmsListenerContainerFactory<?> containerFactory;
public JmsListenerEndpointDescriptor(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> containerFactory) {
this.endpoint = endpoint;
this.containerFactory = containerFactory;
}

View File

@ -18,39 +18,49 @@ package org.springframework.jms.config;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.jms.listener.MessageListenerContainer;
import org.springframework.util.Assert;
/**
* Create the necessary {@link MessageListenerContainer} instances
* for the registered {@linkplain JmsListenerEndpoint endpoints}. Also
* manage the lifecycle of the containers, in particular with the
* lifecycle of the application context.
* Creates the necessary {@link MessageListenerContainer} instances for the
* registered {@linkplain JmsListenerEndpoint endpoints}. Also manages the
* lifecycle of the listener containers, in particular within the lifecycle
* of the application context.
*
* <p>Contrary to {@link MessageListenerContainer} created manually,
* containers managed by this instances are not registered in the
* application context and are not candidates for autowiring. Use
* {@link #getContainers()} if you need to access the containers
* of this instance for management purposes. If you need to access
* a particular container, use {@link #getContainer(String)} with the
* id of the endpoint.
* <p>Contrary to {@link MessageListenerContainer}s created manually, listener
* containers managed by registry are not beans in the application context and
* are not candidates for autowiring. Use {@link #getListenerContainers()} if
* you need to access this registry's listener containers for management purposes.
* If you need to access to a specific message listener container, use
* {@link #getListenerContainer(String)} with the id of the endpoint.
*
* @author Stephane Nicoll
* @author Juergen Hoeller
* @since 4.1
* @see JmsListenerEndpoint
* @see MessageListenerContainer
* @see JmsListenerContainerFactory
*/
public class JmsListenerEndpointRegistry implements DisposableBean {
public class JmsListenerEndpointRegistry implements DisposableBean, SmartLifecycle {
protected final Log logger = LogFactory.getLog(getClass());
private final Map<String, MessageListenerContainer> listenerContainers =
new LinkedHashMap<String, MessageListenerContainer>();
private int phase = Integer.MAX_VALUE;
private final Map<String, MessageListenerContainer> containers =
new HashMap<String, MessageListenerContainer>();
/**
* Return the {@link MessageListenerContainer} with the specified id or
@ -59,72 +69,149 @@ public class JmsListenerEndpointRegistry implements DisposableBean {
* @return the container or {@code null} if no container with that id exists
* @see JmsListenerEndpoint#getId()
*/
public MessageListenerContainer getContainer(String id) {
Assert.notNull(id, "the container identifier must be set.");
return containers.get(id);
public MessageListenerContainer getListenerContainer(String id) {
Assert.notNull(id, "Container identifier must not be null");
return this.listenerContainers.get(id);
}
/**
* Return the managed {@link MessageListenerContainer} instance(s).
*/
public Collection<MessageListenerContainer> getContainers() {
return Collections.unmodifiableCollection(containers.values());
public Collection<MessageListenerContainer> getListenerContainers() {
return Collections.unmodifiableCollection(this.listenerContainers.values());
}
/**
* Create a message listener container for the given {@link JmsListenerEndpoint}.
* <p>This create the necessary infrastructure to honor that endpoint
* with regards to its configuration.
* @param endpoint the endpoint to add
* @see #getContainers()
* @see #getContainer(String)
* @see #getListenerContainers()
* @see #getListenerContainer(String)
*/
public void createJmsListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory) {
public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory) {
Assert.notNull(endpoint, "Endpoint must not be null");
Assert.notNull(factory, "Factory must not be null");
String id = endpoint.getId();
Assert.notNull(id, "Endpoint id must not be null");
Assert.state(!containers.containsKey(id), "another endpoint is already " +
"registered with id '" + id + "'");
Assert.state(!this.listenerContainers.containsKey(id),
"Another endpoint is already registered with id '" + id + "'");
MessageListenerContainer container = doCreateJmsListenerContainer(endpoint, factory);
containers.put(id, container);
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
}
/**
* Create and start a new container using the specified factory.
*/
protected MessageListenerContainer doCreateJmsListenerContainer(JmsListenerEndpoint endpoint,
protected MessageListenerContainer createListenerContainer(JmsListenerEndpoint endpoint,
JmsListenerContainerFactory<?> factory) {
MessageListenerContainer container = factory.createMessageListenerContainer(endpoint);
initializeContainer(container);
return container;
MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
if (listenerContainer instanceof InitializingBean) {
try {
((InitializingBean) listenerContainer).afterPropertiesSet();
}
catch (Exception ex) {
throw new BeanInitializationException("Failed to initialize message listener container", ex);
}
}
int containerPhase = listenerContainer.getPhase();
if (containerPhase < Integer.MAX_VALUE) { // a custom phase value
if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {
throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +
this.phase + " vs " + containerPhase);
}
this.phase = listenerContainer.getPhase();
}
return listenerContainer;
}
@Override
public void destroy() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
if (listenerContainer instanceof DisposableBean) {
try {
((DisposableBean) listenerContainer).destroy();
}
catch (Throwable ex) {
logger.warn("Failed to destroy message listener container", ex);
}
}
}
}
// Delegating implementation of SmartLifecycle
@Override
public int getPhase() {
return this.phase;
}
@Override
public void destroy() throws Exception {
for (MessageListenerContainer container : getContainers()) {
stopContainer(container);
}
public boolean isAutoStartup() {
return true;
}
protected void initializeContainer(MessageListenerContainer container) {
container.start();
if (container instanceof InitializingBean) {
try {
((InitializingBean) container).afterPropertiesSet();
}
catch (Exception e) {
throw new BeanInitializationException("Could not start message listener container", e);
@Override
public void start() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
if (listenerContainer.isAutoStartup()) {
listenerContainer.start();
}
}
}
protected void stopContainer(MessageListenerContainer container) throws Exception {
container.stop();
if (container instanceof DisposableBean) {
((DisposableBean) container).destroy();
@Override
public void stop() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
listenerContainer.stop();
}
}
@Override
public void stop(Runnable callback) {
Collection<MessageListenerContainer> listenerContainers = getListenerContainers();
AggregatingCallback aggregatingCallback = new AggregatingCallback(listenerContainers.size(), callback);
for (MessageListenerContainer listenerContainer : listenerContainers) {
listenerContainer.stop(aggregatingCallback);
}
}
@Override
public boolean isRunning() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
if (listenerContainer.isRunning()) {
return true;
}
}
return false;
}
private static class AggregatingCallback implements Runnable {
private final AtomicInteger count;
private final Runnable finishCallback;
public AggregatingCallback(int count, Runnable finishCallback) {
this.count = new AtomicInteger(count);
this.finishCallback = finishCallback;
}
@Override
public void run() {
if (this.count.decrementAndGet() == 0) {
this.finishCallback.run();
}
}
}

View File

@ -43,6 +43,7 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint {
private JmsHandlerMethodFactory jmsHandlerMethodFactory;
/**
* Set the object instance that should manage this endpoint.
*/
@ -51,19 +52,18 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint {
}
public Object getBean() {
return bean;
return this.bean;
}
/**
* Set the method to invoke to process a message managed by this
* endpoint.
* Set the method to invoke to process a message managed by this endpoint.
*/
public void setMethod(Method method) {
this.method = method;
}
public Method getMethod() {
return method;
return this.method;
}
/**
@ -75,13 +75,14 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint {
this.jmsHandlerMethodFactory = jmsHandlerMethodFactory;
}
@Override
protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) {
Assert.state(jmsHandlerMethodFactory != null,
"Could not create message listener, message listener factory not set.");
Assert.state(this.jmsHandlerMethodFactory != null,
"Could not create message listener - message listener factory not set");
MessagingMessageListenerAdapter messageListener = createMessageListenerInstance();
InvocableHandlerMethod invocableHandlerMethod =
jmsHandlerMethodFactory.createInvocableHandlerMethod(getBean(), getMethod());
this.jmsHandlerMethodFactory.createInvocableHandlerMethod(getBean(), getMethod());
messageListener.setHandlerMethod(invocableHandlerMethod);
String responseDestination = getDefaultResponseDestination();
if (StringUtils.hasText(responseDestination)) {
@ -122,12 +123,8 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint {
@Override
protected StringBuilder getEndpointDescription() {
return super.getEndpointDescription()
.append(" | bean='")
.append(this.bean)
.append("'")
.append(" | method='")
.append(this.method)
.append("'");
.append(" | bean='").append(this.bean).append("'")
.append(" | method='").append(this.method).append("'");
}
}

View File

@ -19,8 +19,8 @@ package org.springframework.jms.config;
import org.springframework.jms.listener.SimpleMessageListenerContainer;
/**
* A {@link JmsListenerContainerFactory} implementation to build
* a {@link SimpleMessageListenerContainer}.
* A {@link JmsListenerContainerFactory} implementation to build a
* standard {@link SimpleMessageListenerContainer}.
*
* @author Stephane Nicoll
* @since 4.1

View File

@ -31,17 +31,23 @@ public class SimpleJmsListenerEndpoint extends AbstractJmsListenerEndpoint {
private MessageListener messageListener;
/**
* Set the {@link MessageListener} to invoke when a message matching
* the endpoint is received.
*/
public void setMessageListener(MessageListener messageListener) {
this.messageListener = messageListener;
}
/**
* Return the {@link MessageListener} to invoke when a message matching
* the endpoint is received.
*/
public MessageListener getMessageListener() {
return messageListener;
return this.messageListener;
}
public void setMessageListener(MessageListener messageListener) {
this.messageListener = messageListener;
}
@Override
protected MessageListener createMessageListener(MessageListenerContainer container) {
@ -51,9 +57,7 @@ public class SimpleJmsListenerEndpoint extends AbstractJmsListenerEndpoint {
@Override
protected StringBuilder getEndpointDescription() {
return super.getEndpointDescription()
.append(" | messageListener='")
.append(this.messageListener)
.append("'");
.append(" | messageListener='").append(this.messageListener).append("'");
}
}

View File

@ -16,11 +16,14 @@
package org.springframework.jms.listener;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
@ -29,7 +32,9 @@ import javax.jms.Topic;
import org.springframework.jms.support.JmsUtils;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ErrorHandler;
import org.springframework.util.ReflectionUtils;
/**
* Abstract base class for message listener containers. Can either host
@ -126,6 +131,13 @@ import org.springframework.util.ErrorHandler;
public abstract class AbstractMessageListenerContainer
extends AbstractJmsListeningContainer implements MessageListenerContainer {
private static final Method createSharedConsumerMethod = ClassUtils.getMethodIfAvailable(
Session.class, "createSharedConsumer", Topic.class, String.class, String.class);
private static final Method createSharedDurableConsumerMethod = ClassUtils.getMethodIfAvailable(
Session.class, "createSharedDurableConsumer", Topic.class, String.class, String.class);
private volatile Object destination;
private volatile String messageSelector;
@ -134,14 +146,18 @@ public abstract class AbstractMessageListenerContainer
private boolean subscriptionDurable = false;
private String durableSubscriptionName;
private boolean subscriptionShared = false;
private String subscriptionName;
private boolean pubSubNoLocal = false;
private MessageConverter messageConverter;
private ExceptionListener exceptionListener;
private ErrorHandler errorHandler;
private MessageConverter messageConverter;
private boolean exposeListenerSession = true;
private boolean acceptMessagesWhileStopping = false;
@ -252,8 +268,8 @@ public abstract class AbstractMessageListenerContainer
public void setMessageListener(Object messageListener) {
checkMessageListener(messageListener);
this.messageListener = messageListener;
if (this.durableSubscriptionName == null) {
this.durableSubscriptionName = getDefaultSubscriptionName(messageListener);
if (this.subscriptionName == null) {
this.subscriptionName = getDefaultSubscriptionName(messageListener);
}
}
@ -301,15 +317,20 @@ public abstract class AbstractMessageListenerContainer
/**
* Set whether to make the subscription durable. The durable subscription name
* to be used can be specified through the "durableSubscriptionName" property.
* to be used can be specified through the "subscriptionName" property.
* <p>Default is "false". Set this to "true" to register a durable subscription,
* typically in combination with a "durableSubscriptionName" value (unless
* typically in combination with a "subscriptionName" value (unless
* your message listener class name is good enough as subscription name).
* <p>Only makes sense when listening to a topic (pub-sub domain).
* @see #setDurableSubscriptionName
* <p>Only makes sense when listening to a topic (pub-sub domain),
* therefore this method switches the "pubSubDomain" flag as well.
* @see #setSubscriptionName
* @see #setPubSubDomain
*/
public void setSubscriptionDurable(boolean subscriptionDurable) {
this.subscriptionDurable = subscriptionDurable;
if (subscriptionDurable) {
setPubSubDomain(true);
}
}
/**
@ -320,25 +341,108 @@ public abstract class AbstractMessageListenerContainer
}
/**
* Set the name of a durable subscription to create. To be applied in case
* of a topic (pub-sub domain) with subscription durability activated.
* Set whether to make the subscription shared. The shared subscription name
* to be used can be specified through the "subscriptionName" property.
* <p>Default is "false". Set this to "true" to register a shared subscription,
* typically in combination with a "subscriptionName" value (unless
* your message listener class name is good enough as subscription name).
* Note that shared subscriptions may also be durable, so this flag can
* (and often will) be combined with "subscriptionDurable" as well.
* <p>Only makes sense when listening to a topic (pub-sub domain),
* therefore this method switches the "pubSubDomain" flag as well.
* <p><b>Requires a JMS 2.0 compatible message broker.</b>
* @see #setSubscriptionName
* @see #setSubscriptionDurable
* @see #setPubSubDomain
*/
public void setSubscriptionShared(boolean subscriptionShared) {
this.subscriptionShared = subscriptionShared;
if (subscriptionShared) {
setPubSubDomain(true);
}
}
/**
* Return whether to make the subscription shared.
*/
public boolean isSubscriptionShared() {
return this.subscriptionShared;
}
/**
* Set the name of a subscription to create. To be applied in case
* of a topic (pub-sub domain) with a shared or durable subscription.
* <p>The subscription name needs to be unique within this client's
* JMS client id. Default is the class name of the specified message listener.
* <p>Note: Only 1 concurrent consumer (which is the default of this
* message listener container) is allowed for each subscription,
* except for a shared subscription (which requires JMS 2.0).
* @see #setPubSubDomain
* @see #setSubscriptionDurable
* @see #setSubscriptionShared
* @see #setClientId
* @see #setMessageListener
*/
public void setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
}
public String getSubscriptionName() {
return this.subscriptionName;
}
/**
* Set the name of a durable subscription to create. This method switches
* to pub-sub domain mode and activates subscription durability as well.
* <p>The durable subscription name needs to be unique within this client's
* JMS client id. Default is the class name of the specified message listener.
* <p>Note: Only 1 concurrent consumer (which is the default of this
* message listener container) is allowed for each durable subscription.
* message listener container) is allowed for each durable subscription,
* except for a shared durable subscription (which requires JMS 2.0).
* @see #setPubSubDomain
* @see #setSubscriptionDurable
* @see #setSubscriptionShared
* @see #setClientId
* @see #setMessageListener
*/
public void setDurableSubscriptionName(String durableSubscriptionName) {
this.durableSubscriptionName = durableSubscriptionName;
this.subscriptionName = durableSubscriptionName;
this.subscriptionDurable = true;
}
/**
* Return the name of a durable subscription to create, if any.
*/
public String getDurableSubscriptionName() {
return this.durableSubscriptionName;
return (this.subscriptionDurable ? this.subscriptionName : null);
}
/**
* Set whether to inhibit the delivery of messages published by its own connection.
* Default is "false".
* @see javax.jms.Session#createConsumer(javax.jms.Destination, String, boolean)
*/
public void setPubSubNoLocal(boolean pubSubNoLocal) {
this.pubSubNoLocal = pubSubNoLocal;
}
/**
* Return whether to inhibit the delivery of messages published by its own connection.
*/
public boolean isPubSubNoLocal() {
return this.pubSubNoLocal;
}
/**
* Set the {@link MessageConverter} strategy for converting JMS Messages.
*/
public void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
@Override
public MessageConverter getMessageConverter() {
return this.messageConverter;
}
/**
@ -358,25 +462,21 @@ public abstract class AbstractMessageListenerContainer
}
/**
* Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown
* while processing a Message. By default there will be <b>no</b> ErrorHandler
* so that error-level logging is the only result.
* Set the ErrorHandler to be invoked in case of any uncaught exceptions thrown
* while processing a Message.
* <p>By default, there will be <b>no</b> ErrorHandler so that error-level
* logging is the only result.
*/
public void setErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}
/**
* Set the {@link MessageConverter} strategy for converting JMS Messages.
* @param messageConverter the message converter to use
* Return the ErrorHandler to be invoked in case of any uncaught exceptions thrown
* while processing a Message.
*/
public void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
@Override
public MessageConverter getMessageConverter() {
return messageConverter;
public ErrorHandler getErrorHandler() {
return this.errorHandler;
}
/**
@ -436,9 +536,6 @@ public abstract class AbstractMessageListenerContainer
if (this.destination == null) {
throw new IllegalArgumentException("Property 'destination' or 'destinationName' is required");
}
if (isSubscriptionDurable() && !isPubSubDomain()) {
throw new IllegalArgumentException("A durable subscription requires a topic (pub-sub domain)");
}
}
@Override
@ -446,6 +543,7 @@ public abstract class AbstractMessageListenerContainer
setMessageListener(messageListener);
}
//-------------------------------------------------------------------------
// Template methods for listener execution
//-------------------------------------------------------------------------
@ -668,6 +766,51 @@ public abstract class AbstractMessageListenerContainer
return isSessionTransacted();
}
/**
* Create a JMS MessageConsumer for the given Session and Destination.
* <p>This implementation uses JMS 1.1 API.
* @param session the JMS Session to create a MessageConsumer for
* @param destination the JMS Destination to create a MessageConsumer for
* @return the new JMS MessageConsumer
* @throws javax.jms.JMSException if thrown by JMS API methods
*/
protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {
if (isPubSubDomain() && destination instanceof Topic) {
if (isSubscriptionShared()) {
// createSharedConsumer((Topic) dest, subscription, selector);
// createSharedDurableConsumer((Topic) dest, subscription, selector);
Method method = (isSubscriptionDurable() ?
createSharedDurableConsumerMethod : createSharedConsumerMethod);
try {
return (MessageConsumer) method.invoke(session, destination, getSubscriptionName(), getMessageSelector());
}
catch (InvocationTargetException ex) {
if (ex.getTargetException() instanceof JMSException) {
throw (JMSException) ex.getTargetException();
}
ReflectionUtils.handleInvocationTargetException(ex);
return null;
}
catch (IllegalAccessException ex) {
throw new IllegalStateException("Could not access JMS 2.0 API method: " + ex.getMessage());
}
}
else if (isSubscriptionDurable()) {
return session.createDurableSubscriber(
(Topic) destination, getSubscriptionName(), getMessageSelector(), isPubSubNoLocal());
}
else {
// Only pass in the NoLocal flag in case of a Topic (pub-sub mode):
// Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException
// in case of the NoLocal flag being specified for a Queue.
return session.createConsumer(destination, getMessageSelector(), isPubSubNoLocal());
}
}
else {
return session.createConsumer(destination, getMessageSelector());
}
}
/**
* Handle the given exception that arose during listener execution.
* <p>The default implementation logs the exception at warn level,
@ -714,8 +857,9 @@ public abstract class AbstractMessageListenerContainer
* @see #setErrorHandler
*/
protected void invokeErrorHandler(Throwable ex) {
if (this.errorHandler != null) {
this.errorHandler.handleError(ex);
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
errorHandler.handleError(ex);
}
else if (logger.isWarnEnabled()) {
logger.warn("Execution of JMS message listener failed, and no ErrorHandler has been set.", ex);

View File

@ -87,8 +87,6 @@ public abstract class AbstractPollingMessageListenerContainer extends AbstractMe
private boolean sessionTransactedCalled = false;
private boolean pubSubNoLocal = false;
private PlatformTransactionManager transactionManager;
private DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
@ -104,22 +102,6 @@ public abstract class AbstractPollingMessageListenerContainer extends AbstractMe
this.sessionTransactedCalled = true;
}
/**
* Set whether to inhibit the delivery of messages published by its own connection.
* Default is "false".
* @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, String, boolean)
*/
public void setPubSubNoLocal(boolean pubSubNoLocal) {
this.pubSubNoLocal = pubSubNoLocal;
}
/**
* Return whether to inhibit the delivery of messages published by its own connection.
*/
protected boolean isPubSubNoLocal() {
return this.pubSubNoLocal;
}
/**
* Specify the Spring {@link org.springframework.transaction.PlatformTransactionManager}
* to use for transactional wrapping of message reception plus listener execution.
@ -452,11 +434,6 @@ public abstract class AbstractPollingMessageListenerContainer extends AbstractMe
protected void noMessageReceived(Object invoker, Session session) {
}
//-------------------------------------------------------------------------
// JMS 1.1 factory methods, potentially overridden for JMS 1.0.2
//-------------------------------------------------------------------------
/**
* Fetch an appropriate Connection from the given JmsResourceHolder.
* <p>This implementation accepts any JMS 1.1 Connection.
@ -479,32 +456,6 @@ public abstract class AbstractPollingMessageListenerContainer extends AbstractMe
return holder.getSession();
}
/**
* Create a JMS MessageConsumer for the given Session and Destination.
* <p>This implementation uses JMS 1.1 API.
* @param session the JMS Session to create a MessageConsumer for
* @param destination the JMS Destination to create a MessageConsumer for
* @return the new JMS MessageConsumer
* @throws javax.jms.JMSException if thrown by JMS API methods
*/
protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {
// Only pass in the NoLocal flag in case of a Topic:
// Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException
// in case of the NoLocal flag being specified for a Queue.
if (isPubSubDomain()) {
if (isSubscriptionDurable() && destination instanceof Topic) {
return session.createDurableSubscriber(
(Topic) destination, getDurableSubscriptionName(), getMessageSelector(), isPubSubNoLocal());
}
else {
return session.createConsumer(destination, getMessageSelector(), isPubSubNoLocal());
}
}
else {
return session.createConsumer(destination, getMessageSelector());
}
}
/**
* ResourceFactory implementation that delegates to this listener container's protected callback methods.

View File

@ -16,7 +16,7 @@
package org.springframework.jms.listener;
import org.springframework.context.Lifecycle;
import org.springframework.context.SmartLifecycle;
import org.springframework.jms.support.converter.MessageConverter;
/**
@ -27,7 +27,7 @@ import org.springframework.jms.support.converter.MessageConverter;
* @author Stephane Nicoll
* @since 4.1
*/
public interface MessageListenerContainer extends Lifecycle {
public interface MessageListenerContainer extends SmartLifecycle {
/**
* Setup the message listener to use. Throws an {@link IllegalArgumentException}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -27,7 +27,6 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import org.springframework.jms.support.JmsUtils;
import org.springframework.transaction.support.TransactionSynchronizationManager;
@ -60,8 +59,6 @@ import org.springframework.util.Assert;
*/
public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer implements ExceptionListener {
private boolean pubSubNoLocal = false;
private boolean connectLazily = false;
private int concurrentConsumers = 1;
@ -75,22 +72,6 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
private final Object consumersMonitor = new Object();
/**
* Set whether to inhibit the delivery of messages published by its own connection.
* Default is "false".
* @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, String, boolean)
*/
public void setPubSubNoLocal(boolean pubSubNoLocal) {
this.pubSubNoLocal = pubSubNoLocal;
}
/**
* Return whether to inhibit the delivery of messages published by its own connection.
*/
protected boolean isPubSubNoLocal() {
return this.pubSubNoLocal;
}
/**
* Specify whether to connect lazily, i.e. whether to establish the JMS Connection
* and the corresponding Sessions and MessageConsumers as late as possible -
@ -370,35 +351,4 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
}
}
//-------------------------------------------------------------------------
// JMS 1.1 factory methods, potentially overridden for JMS 1.0.2
//-------------------------------------------------------------------------
/**
* Create a JMS MessageConsumer for the given Session and Destination.
* <p>This implementation uses JMS 1.1 API.
* @param session the JMS Session to create a MessageConsumer for
* @param destination the JMS Destination to create a MessageConsumer for
* @return the new JMS MessageConsumer
* @throws JMSException if thrown by JMS API methods
*/
protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {
// Only pass in the NoLocal flag in case of a Topic:
// Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException
// in case of the NoLocal flag being specified for a Queue.
if (isPubSubDomain()) {
if (isSubscriptionDurable() && destination instanceof Topic) {
return session.createDurableSubscriber(
(Topic) destination, getDurableSubscriptionName(), getMessageSelector(), isPubSubNoLocal());
}
else {
return session.createConsumer(destination, getMessageSelector(), isPubSubNoLocal());
}
}
else {
return session.createConsumer(destination, getMessageSelector());
}
}
}

View File

@ -269,7 +269,7 @@ public abstract class AbstractAdaptableMessageListener
MessageConverter converter = getMessageConverter();
if (converter != null) {
if (result instanceof org.springframework.messaging.Message) {
return messagingMessageConverter.toMessage(result, session);
return this.messagingMessageConverter.toMessage(result, session);
}
else {
return converter.toMessage(result, session);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -26,7 +26,7 @@ import org.springframework.beans.BeanWrapper;
/**
* Default implementation of the {@link JmsActivationSpecFactory} interface.
* Supports the standard JMS properties as defined by the JMS 1.5 specification,
* Supports the standard JMS properties as defined by the JCA 1.5 specification,
* as well as Spring's extended "maxConcurrency" and "prefetchSize" settings
* through autodetection of well-known vendor-specific provider properties.
*
@ -158,7 +158,7 @@ public class DefaultJmsActivationSpecFactory extends StandardJmsActivationSpecFa
// JORAM
bw.setPropertyValue("maxMessages", Integer.toString(config.getPrefetchSize()));
}
else if(bw.isWritableProperty("maxBatchSize")){
else if (bw.isWritableProperty("maxBatchSize")){
// WebSphere
bw.setPropertyValue("maxBatchSize", Integer.toString(config.getPrefetchSize()));
}

View File

@ -48,7 +48,9 @@ public class JmsActivationSpecConfig {
private boolean subscriptionDurable = false;
private String durableSubscriptionName;
private boolean subscriptionShared = false;
private String subscriptionName;
private String clientId;
@ -81,18 +83,41 @@ public class JmsActivationSpecConfig {
public void setSubscriptionDurable(boolean subscriptionDurable) {
this.subscriptionDurable = subscriptionDurable;
if (subscriptionDurable) {
this.pubSubDomain = true;
}
}
public boolean isSubscriptionDurable() {
return this.subscriptionDurable;
}
public void setSubscriptionShared(boolean subscriptionShared) {
this.subscriptionShared = subscriptionShared;
if (subscriptionShared) {
this.pubSubDomain = true;
}
}
public boolean isSubscriptionShared() {
return this.subscriptionShared;
}
public void setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
}
public String getSubscriptionName() {
return this.subscriptionName;
}
public void setDurableSubscriptionName(String durableSubscriptionName) {
this.durableSubscriptionName = durableSubscriptionName;
this.subscriptionName = durableSubscriptionName;
this.subscriptionDurable = true;
}
public String getDurableSubscriptionName() {
return this.durableSubscriptionName;
return (this.subscriptionDurable ? this.subscriptionName : null);
}
public void setClientId(String clientId) {
@ -217,7 +242,7 @@ public class JmsActivationSpecConfig {
* Return the {@link MessageConverter} to use, if any.
*/
public MessageConverter getMessageConverter() {
return messageConverter;
return this.messageConverter;
}
}

View File

@ -144,7 +144,42 @@ public class JmsMessageEndpointManager extends GenericMessageEndpointManager
* should use for activating its listener. Return {@code null} if none is set.
*/
public JmsActivationSpecConfig getActivationSpecConfig() {
return activationSpecConfig;
return this.activationSpecConfig;
}
/**
* Set the name of this message endpoint. Populated with the bean name
* automatically when defined within Spring's bean factory.
*/
@Override
public void setBeanName(String beanName) {
this.endpointFactory.setBeanName(beanName);
}
@Override
public void afterPropertiesSet() throws ResourceException {
if (this.messageListenerSet) {
setMessageEndpointFactory(this.endpointFactory);
}
if (this.activationSpecConfig != null) {
setActivationSpec(
this.activationSpecFactory.createActivationSpec(getResourceAdapter(), this.activationSpecConfig));
}
super.afterPropertiesSet();
}
@Override
public void setupMessageListener(Object messageListener) {
if (messageListener instanceof MessageListener) {
setMessageListener((MessageListener) messageListener);
}
else {
throw new IllegalArgumentException("Unsupported message listener '" +
messageListener.getClass().getName() + "': only '" + MessageListener.class.getName() +
"' type is supported");
}
}
@Override
@ -162,40 +197,7 @@ public class JmsMessageEndpointManager extends GenericMessageEndpointManager
if (config != null) {
return config.isPubSubDomain();
}
throw new IllegalStateException("could not determine pubSubDomain, no activation spec config is set");
}
/**
* Set the name of this message endpoint. Populated with the bean name
* automatically when defined within Spring's bean factory.
*/
@Override
public void setBeanName(String beanName) {
this.endpointFactory.setBeanName(beanName);
}
@Override
public void setupMessageListener(Object messageListener) {
if (messageListener instanceof MessageListener) {
setMessageListener((MessageListener) messageListener);
}
else {
throw new IllegalArgumentException("Unsupported message listener '"
+ messageListener.getClass().getName() + "': only '"
+ MessageListener.class.getName() + "' type is supported");
}
}
@Override
public void afterPropertiesSet() throws ResourceException {
if (this.messageListenerSet) {
setMessageEndpointFactory(this.endpointFactory);
}
if (this.activationSpecConfig != null) {
setActivationSpec(
this.activationSpecFactory.createActivationSpec(getResourceAdapter(), this.activationSpecConfig));
}
super.afterPropertiesSet();
throw new IllegalStateException("Could not determine pubSubDomain - no activation spec config is set");
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -153,17 +153,19 @@ public class StandardJmsActivationSpecFactory implements JmsActivationSpecFactor
throw new IllegalArgumentException(
"Durable subscriptions not supported by underlying provider: " + this.activationSpecClass.getName());
}
if (config.getDurableSubscriptionName() != null) {
bw.setPropertyValue("subscriptionName", config.getDurableSubscriptionName());
if (config.isSubscriptionShared()) {
throw new IllegalArgumentException("Shared subscriptions not supported for JCA-driven endpoints");
}
if (config.getSubscriptionName() != null) {
bw.setPropertyValue("subscriptionName", config.getSubscriptionName());
}
if (config.getClientId() != null) {
bw.setPropertyValue("clientId", config.getClientId());
}
if (config.getMessageSelector() != null) {
bw.setPropertyValue("messageSelector", config.getMessageSelector());
}
applyAcknowledgeMode(bw, config.getAcknowledgeMode());
}

View File

@ -218,8 +218,8 @@
<xsd:attribute name="destination-type" default="queue">
<xsd:annotation>
<xsd:documentation><![CDATA[
The JMS destination type for this listener: "queue", "topic" or "durableTopic".
The default is "queue".
The JMS destination type for this listener: "queue", "topic", "durableTopic",
"sharedTopic", "sharedDurableTopic". The default is "queue".
]]></xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
@ -227,6 +227,8 @@
<xsd:enumeration value="queue"/>
<xsd:enumeration value="topic"/>
<xsd:enumeration value="durableTopic"/>
<xsd:enumeration value="sharedTopic"/>
<xsd:enumeration value="sharedDurableTopic"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
@ -234,7 +236,7 @@
<xsd:annotation>
<xsd:documentation><![CDATA[
The JMS client id for this listener container.
Needs to be specified when using durable subscriptions.
Needs to be specified when using subscriptions.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>

View File

@ -76,8 +76,8 @@ public abstract class AbstractJmsAnnotationDrivenTests {
context.getBean("jmsListenerContainerFactory", JmsListenerContainerTestFactory.class);
JmsListenerContainerTestFactory simpleFactory =
context.getBean("simpleFactory", JmsListenerContainerTestFactory.class);
assertEquals(1, defaultFactory.getContainers().size());
assertEquals(1, simpleFactory.getContainers().size());
assertEquals(1, defaultFactory.getListenerContainers().size());
assertEquals(1, simpleFactory.getListenerContainers().size());
}
@Component
@ -100,9 +100,9 @@ public abstract class AbstractJmsAnnotationDrivenTests {
public void testFullConfiguration(ApplicationContext context) {
JmsListenerContainerTestFactory simpleFactory =
context.getBean("simpleFactory", JmsListenerContainerTestFactory.class);
assertEquals(1, simpleFactory.getContainers().size());
assertEquals(1, simpleFactory.getListenerContainers().size());
MethodJmsListenerEndpoint endpoint = (MethodJmsListenerEndpoint)
simpleFactory.getContainers().get(0).getEndpoint();
simpleFactory.getListenerContainers().get(0).getEndpoint();
assertEquals("listener1", endpoint.getId());
assertEquals("queueIn", endpoint.getDestination());
assertEquals("mySelector", endpoint.getSelector());
@ -129,9 +129,9 @@ public abstract class AbstractJmsAnnotationDrivenTests {
context.getBean("jmsListenerContainerFactory", JmsListenerContainerTestFactory.class);
JmsListenerContainerTestFactory customFactory =
context.getBean("customFactory", JmsListenerContainerTestFactory.class);
assertEquals(1, defaultFactory.getContainers().size());
assertEquals(1, customFactory.getContainers().size());
JmsListenerEndpoint endpoint = defaultFactory.getContainers().get(0).getEndpoint();
assertEquals(1, defaultFactory.getListenerContainers().size());
assertEquals(1, customFactory.getListenerContainers().size());
JmsListenerEndpoint endpoint = defaultFactory.getListenerContainers().get(0).getEndpoint();
assertEquals("Wrong endpoint type", SimpleJmsListenerEndpoint.class, endpoint.getClass());
assertEquals("Wrong listener set in custom endpoint", context.getBean("simpleMessageListener"),
((SimpleJmsListenerEndpoint) endpoint).getMessageListener());
@ -139,11 +139,11 @@ public abstract class AbstractJmsAnnotationDrivenTests {
JmsListenerEndpointRegistry customRegistry =
context.getBean("customRegistry", JmsListenerEndpointRegistry.class);
assertEquals("Wrong number of containers in the registry", 2,
customRegistry.getContainers().size());
customRegistry.getListenerContainers().size());
assertNotNull("Container with custom id on the annotation should be found",
customRegistry.getContainer("listenerId"));
customRegistry.getListenerContainer("listenerId"));
assertNotNull("Container created with custom id should be found",
customRegistry.getContainer("myCustomEndpointId"));
customRegistry.getListenerContainer("myCustomEndpointId"));
}
@Component
@ -162,7 +162,7 @@ public abstract class AbstractJmsAnnotationDrivenTests {
public void testExplicitContainerFactoryConfiguration(ApplicationContext context) {
JmsListenerContainerTestFactory defaultFactory =
context.getBean("simpleFactory", JmsListenerContainerTestFactory.class);
assertEquals(1, defaultFactory.getContainers().size());
assertEquals(1, defaultFactory.getListenerContainers().size());
}
/**
@ -172,7 +172,7 @@ public abstract class AbstractJmsAnnotationDrivenTests {
public void testDefaultContainerFactoryConfiguration(ApplicationContext context) {
JmsListenerContainerTestFactory defaultFactory =
context.getBean("jmsListenerContainerFactory", JmsListenerContainerTestFactory.class);
assertEquals(1, defaultFactory.getContainers().size());
assertEquals(1, defaultFactory.getListenerContainers().size());
}
static class DefaultBean {
@ -191,12 +191,12 @@ public abstract class AbstractJmsAnnotationDrivenTests {
public void testJmsHandlerMethodFactoryConfiguration(ApplicationContext context) throws JMSException {
JmsListenerContainerTestFactory simpleFactory =
context.getBean("defaultFactory", JmsListenerContainerTestFactory.class);
assertEquals(1, simpleFactory.getContainers().size());
assertEquals(1, simpleFactory.getListenerContainers().size());
MethodJmsListenerEndpoint endpoint = (MethodJmsListenerEndpoint)
simpleFactory.getContainers().get(0).getEndpoint();
simpleFactory.getListenerContainers().get(0).getEndpoint();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
endpoint.setupMessageContainer(container);
endpoint.setupListenerContainer(container);
MessagingMessageListenerAdapter listener = (MessagingMessageListenerAdapter) container.getMessageListener();
listener.onMessage(new StubTextMessage("failValidation"), mock(Session.class));
}

View File

@ -16,8 +16,6 @@
package org.springframework.jms.annotation;
import static org.junit.Assert.*;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@ -38,47 +36,51 @@ import org.springframework.jms.config.MethodJmsListenerEndpoint;
import org.springframework.jms.listener.SimpleMessageListenerContainer;
import org.springframework.stereotype.Component;
import static org.junit.Assert.*;
/**
*
* @author Stephane Nicoll
*/
public class JmsListenerAnnotationBeanPostProcessorTests {
@Test
public void simpleMessageListener() {
final ConfigurableApplicationContext context = new AnnotationConfigApplicationContext(Config.class,
SimpleMessageListenerTestBean.class);
ConfigurableApplicationContext context = new AnnotationConfigApplicationContext(
Config.class, SimpleMessageListenerTestBean.class);
JmsListenerContainerTestFactory factory = context.getBean(JmsListenerContainerTestFactory.class);
assertEquals("one container should have been registered", 1, factory.getContainers().size());
MessageListenerTestContainer container = factory.getContainers().get(0);
assertEquals("One container should have been registered", 1, factory.getListenerContainers().size());
MessageListenerTestContainer container = factory.getListenerContainers().get(0);
JmsListenerEndpoint endpoint = container.getEndpoint();
assertEquals("Wrong endpoint type", MethodJmsListenerEndpoint.class, endpoint.getClass());
MethodJmsListenerEndpoint methodEndpoint = (MethodJmsListenerEndpoint) endpoint;
assertNotNull(methodEndpoint.getBean());
assertNotNull(methodEndpoint.getMethod());
assertTrue("Should have been started " + container, container.isStarted());
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
methodEndpoint.setupMessageContainer(listenerContainer);
methodEndpoint.setupListenerContainer(listenerContainer);
assertNotNull(listenerContainer.getMessageListener());
context.start();
assertTrue("Should have been started " + container, container.isStarted());
context.close(); // Close and stop the listeners
assertTrue("Should have been stopped " + container, container.isStopped());
}
@Test
public void metaAnnotationIsDiscovered() {
final ConfigurableApplicationContext context = new AnnotationConfigApplicationContext(Config.class,
MetaAnnotationTestBean.class);
ConfigurableApplicationContext context = new AnnotationConfigApplicationContext(
Config.class, MetaAnnotationTestBean.class);
JmsListenerContainerTestFactory factory = context.getBean(JmsListenerContainerTestFactory.class);
assertEquals("one container should have been registered", 1, factory.getContainers().size());
JmsListenerEndpoint endpoint = factory.getContainers().get(0).getEndpoint();
assertEquals("one container should have been registered", 1, factory.getListenerContainers().size());
JmsListenerEndpoint endpoint = factory.getListenerContainers().get(0).getEndpoint();
assertEquals("metaTestQueue", ((AbstractJmsListenerEndpoint) endpoint).getDestination());
}
@Component
static class SimpleMessageListenerTestBean {
@ -88,6 +90,7 @@ public class JmsListenerAnnotationBeanPostProcessorTests {
}
@Component
static class MetaAnnotationTestBean {
@ -101,9 +104,9 @@ public class JmsListenerAnnotationBeanPostProcessorTests {
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
static @interface FooListener {
}
@Configuration
static class Config {
@ -124,6 +127,6 @@ public class JmsListenerAnnotationBeanPostProcessorTests {
public JmsListenerContainerTestFactory testFactory() {
return new JmsListenerContainerTestFactory();
}
}
}

View File

@ -83,7 +83,7 @@ public class JmsListenerContainerFactoryIntegrationTests {
@SuppressWarnings("unchecked")
private void invokeListener(JmsListenerEndpoint endpoint, Message message) throws JMSException {
DefaultMessageListenerContainer messageListenerContainer =
containerFactory.createMessageListenerContainer(endpoint);
containerFactory.createListenerContainer(endpoint);
Object listener = messageListenerContainer.getMessageListener();
if (listener instanceof SessionAwareMessageListener) {
((SessionAwareMessageListener<Message>) listener).onMessage(message, mock(Session.class));

View File

@ -45,7 +45,6 @@ import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
*
* @author Stephane Nicoll
*/
public class JmsListenerContainerFactoryTests {
@ -61,6 +60,7 @@ public class JmsListenerContainerFactoryTests {
private final TransactionManager transactionManager = mock(TransactionManager.class);
@Test
public void createSimpleContainer() {
SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
@ -71,14 +71,13 @@ public class JmsListenerContainerFactoryTests {
endpoint.setMessageListener(messageListener);
endpoint.setDestination("myQueue");
SimpleMessageListenerContainer container = factory.createMessageListenerContainer(endpoint);
SimpleMessageListenerContainer container = factory.createListenerContainer(endpoint);
assertDefaultJmsConfig(container);
assertEquals(messageListener, container.getMessageListener());
assertEquals("myQueue", container.getDestinationName());
}
@Test
public void createJmsContainerFullConfig() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
@ -91,7 +90,7 @@ public class JmsListenerContainerFactoryTests {
MessageListener messageListener = new MessageListenerAdapter();
endpoint.setMessageListener(messageListener);
endpoint.setDestination("myQueue");
DefaultMessageListenerContainer container = factory.createMessageListenerContainer(endpoint);
DefaultMessageListenerContainer container = factory.createListenerContainer(endpoint);
assertDefaultJmsConfig(container);
assertEquals(DefaultMessageListenerContainer.CACHE_CONSUMER, container.getCacheLevel());
@ -107,13 +106,13 @@ public class JmsListenerContainerFactoryTests {
public void createJcaContainerFullConfig() {
DefaultJcaListenerContainerFactory factory = new DefaultJcaListenerContainerFactory();
setDefaultJcaConfig(factory);
factory.getActivationSpecConfig().setConcurrency("10");
factory.setConcurrency("10");
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
MessageListener messageListener = new MessageListenerAdapter();
endpoint.setMessageListener(messageListener);
endpoint.setDestination("myQueue");
JmsMessageEndpointManager container = factory.createMessageListenerContainer(endpoint);
JmsMessageEndpointManager container = factory.createListenerContainer(endpoint);
assertDefaultJcaConfig(container);
assertEquals(10, container.getActivationSpecConfig().getMaxConcurrency());
@ -130,7 +129,7 @@ public class JmsListenerContainerFactoryTests {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setMessageListener(new MessageListenerAdapter());
thrown.expect(IllegalStateException.class);
factory.createMessageListenerContainer(endpoint);
factory.createListenerContainer(endpoint);
}
@Test
@ -144,7 +143,7 @@ public class JmsListenerContainerFactoryTests {
MessageListener messageListener = new MessageListenerAdapter();
endpoint.setMessageListener(messageListener);
endpoint.setDestination("myQueue");
DefaultMessageListenerContainer container = factory.createMessageListenerContainer(endpoint);
DefaultMessageListenerContainer container = factory.createListenerContainer(endpoint);
assertSame(backOff, new DirectFieldAccessor(container).getPropertyValue("backOff"));
}
@ -174,13 +173,11 @@ public class JmsListenerContainerFactoryTests {
private void setDefaultJcaConfig(DefaultJcaListenerContainerFactory factory) {
factory.setDestinationResolver(destinationResolver);
factory.setTransactionManager(transactionManager);
JmsActivationSpecConfig config = new JmsActivationSpecConfig();
config.setMessageConverter(messageConverter);
config.setAcknowledgeMode(Session.DUPS_OK_ACKNOWLEDGE);
config.setPubSubDomain(true);
config.setSubscriptionDurable(true);
config.setClientId("client-1234");
factory.setActivationSpecConfig(config);
factory.setMessageConverter(messageConverter);
factory.setAcknowledgeMode(Session.DUPS_OK_ACKNOWLEDGE);
factory.setPubSubDomain(true);
factory.setSubscriptionDurable(true);
factory.setClientId("client-1234");
}
private void assertDefaultJcaConfig(JmsMessageEndpointManager container) {

View File

@ -25,17 +25,17 @@ import java.util.List;
*/
public class JmsListenerContainerTestFactory implements JmsListenerContainerFactory<MessageListenerTestContainer> {
private final List<MessageListenerTestContainer> containers =
private final List<MessageListenerTestContainer> listenerContainers =
new ArrayList<MessageListenerTestContainer>();
public List<MessageListenerTestContainer> getContainers() {
return containers;
public List<MessageListenerTestContainer> getListenerContainers() {
return listenerContainers;
}
@Override
public MessageListenerTestContainer createMessageListenerContainer(JmsListenerEndpoint endpoint) {
public MessageListenerTestContainer createListenerContainer(JmsListenerEndpoint endpoint) {
MessageListenerTestContainer container = new MessageListenerTestContainer(endpoint);
this.containers.add(container);
this.listenerContainers.add(container);
return container;
}

View File

@ -65,8 +65,8 @@ public class JmsListenerEndpointRegistrarTests {
registrar.setContainerFactory(containerFactory);
registrar.registerEndpoint(endpoint, null);
registrar.afterPropertiesSet();
assertNotNull("Container not created", registry.getContainer("some id"));
assertEquals(1, registry.getContainers().size());
assertNotNull("Container not created", registry.getListenerContainer("some id"));
assertEquals(1, registry.getListenerContainers().size());
}
@Test
@ -87,8 +87,8 @@ public class JmsListenerEndpointRegistrarTests {
registrar.setContainerFactory(containerFactory);
registrar.registerEndpoint(endpoint);
registrar.afterPropertiesSet();
assertNotNull("Container not created", registry.getContainer("myEndpoint"));
assertEquals(1, registry.getContainers().size());
assertNotNull("Container not created", registry.getListenerContainer("myEndpoint"));
assertEquals(1, registry.getListenerContainers().size());
}
}

View File

@ -36,27 +36,27 @@ public class JmsListenerEndpointRegistryTests {
@Test
public void createWithNullEndpoint() {
thrown.expect(IllegalArgumentException.class);
registry.createJmsListenerContainer(null, containerFactory);
registry.registerListenerContainer(null, containerFactory);
}
@Test
public void createWithNullEndpointId() {
thrown.expect(IllegalArgumentException.class);
registry.createJmsListenerContainer(new SimpleJmsListenerEndpoint(), containerFactory);
registry.registerListenerContainer(new SimpleJmsListenerEndpoint(), containerFactory);
}
@Test
public void createWithNullContainerFactory() {
thrown.expect(IllegalArgumentException.class);
registry.createJmsListenerContainer(createEndpoint("foo", "myDestination"), null);
registry.registerListenerContainer(createEndpoint("foo", "myDestination"), null);
}
@Test
public void createWithDuplicateEndpointId() {
registry.createJmsListenerContainer(createEndpoint("test", "queue"), containerFactory);
registry.registerListenerContainer(createEndpoint("test", "queue"), containerFactory);
thrown.expect(IllegalStateException.class);
registry.createJmsListenerContainer(createEndpoint("test", "queue"), containerFactory);
registry.registerListenerContainer(createEndpoint("test", "queue"), containerFactory);
}
private SimpleJmsListenerEndpoint createEndpoint(String id, String destinationName) {

View File

@ -16,9 +16,6 @@
package org.springframework.jms.config;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import javax.jms.MessageListener;
import org.junit.Rule;
@ -33,8 +30,10 @@ import org.springframework.jms.listener.adapter.MessageListenerAdapter;
import org.springframework.jms.listener.endpoint.JmsActivationSpecConfig;
import org.springframework.jms.listener.endpoint.JmsMessageEndpointManager;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
*
* @author Stephane Nicoll
*/
public class JmsListenerEndpointTests {
@ -53,10 +52,10 @@ public class JmsListenerEndpointTests {
endpoint.setConcurrency("5-10");
endpoint.setMessageListener(messageListener);
endpoint.setupMessageContainer(container);
endpoint.setupListenerContainer(container);
assertEquals("myQueue", container.getDestinationName());
assertEquals("foo = 'bar'", container.getMessageSelector());
assertEquals("mySubscription", container.getDurableSubscriptionName());
assertEquals("mySubscription", container.getSubscriptionName());
assertEquals(5, container.getConcurrentConsumers());
assertEquals(10, container.getMaxConcurrentConsumers());
assertEquals(messageListener, container.getMessageListener());
@ -73,11 +72,11 @@ public class JmsListenerEndpointTests {
endpoint.setConcurrency("10");
endpoint.setMessageListener(messageListener);
endpoint.setupMessageContainer(container);
endpoint.setupListenerContainer(container);
JmsActivationSpecConfig config = container.getActivationSpecConfig();
assertEquals("myQueue", config.getDestinationName());
assertEquals("foo = 'bar'", config.getMessageSelector());
assertEquals("mySubscription", config.getDurableSubscriptionName());
assertEquals("mySubscription", config.getSubscriptionName());
assertEquals(10, config.getMaxConcurrency());
assertEquals(messageListener, container.getMessageListener());
}
@ -90,7 +89,7 @@ public class JmsListenerEndpointTests {
endpoint.setConcurrency("5-10"); // simple implementation only support max value
endpoint.setMessageListener(messageListener);
endpoint.setupMessageContainer(container);
endpoint.setupListenerContainer(container);
assertEquals(10, new DirectFieldAccessor(container).getPropertyValue("concurrentConsumers"));
}
@ -100,7 +99,7 @@ public class JmsListenerEndpointTests {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
thrown.expect(IllegalStateException.class);
endpoint.setupMessageContainer(container);
endpoint.setupListenerContainer(container);
}
@Test
@ -110,7 +109,7 @@ public class JmsListenerEndpointTests {
endpoint.setMessageListener(new MessageListenerAdapter());
thrown.expect(IllegalArgumentException.class);
endpoint.setupMessageContainer(container);
endpoint.setupListenerContainer(container);
}

View File

@ -145,7 +145,7 @@ public class JmsNamespaceHandlerTests {
assertNotNull("No factory registered with testJmsFactory id", factory);
DefaultMessageListenerContainer container =
factory.createMessageListenerContainer(createDummyEndpoint());
factory.createListenerContainer(createDummyEndpoint());
assertEquals("explicit connection factory not set",
context.getBean(EXPLICIT_CONNECTION_FACTORY), container.getConnectionFactory());
assertEquals("explicit destination resolver not set",
@ -156,9 +156,8 @@ public class JmsNamespaceHandlerTests {
assertEquals("wrong concurrency", 3, container.getConcurrentConsumers());
assertEquals("wrong concurrency", 5, container.getMaxConcurrentConsumers());
assertEquals("wrong prefetch", 50, container.getMaxMessagesPerTask());
assertSame(context.getBean("testBackOff"),new DirectFieldAccessor(container).getPropertyValue("backOff"));
assertEquals("phase cannot be customized by the factory", Integer.MAX_VALUE, container.getPhase());
assertEquals("Wrong phase", 99, container.getPhase());
assertSame(context.getBean("testBackOff"), new DirectFieldAccessor(container).getPropertyValue("backOff"));
}
@Test
@ -169,14 +168,14 @@ public class JmsNamespaceHandlerTests {
assertNotNull("No factory registered with testJcaFactory id", factory);
JmsMessageEndpointManager container =
factory.createMessageListenerContainer(createDummyEndpoint());
factory.createListenerContainer(createDummyEndpoint());
assertEquals("explicit resource adapter not set",
context.getBean("testResourceAdapter"),container.getResourceAdapter());
assertEquals("explicit message converter not set",
context.getBean("testMessageConverter"), container.getActivationSpecConfig().getMessageConverter());
assertEquals("wrong concurrency", 5, container.getActivationSpecConfig().getMaxConcurrency());
assertEquals("Wrong prefetch", 50, container.getActivationSpecConfig().getPrefetchSize());
assertEquals("phase cannot be customized by the factory", Integer.MAX_VALUE, container.getPhase());
assertEquals("Wrong phase", 77, container.getPhase());
}
@Test

View File

@ -23,7 +23,6 @@ import org.springframework.jms.listener.MessageListenerContainer;
import org.springframework.jms.support.converter.MessageConverter;
/**
*
* @author Stephane Nicoll
*/
public class MessageListenerTestContainer
@ -57,17 +56,15 @@ public class MessageListenerTestContainer
@Override
public void start() throws JmsException {
if (!initializationInvoked) {
throw new IllegalStateException("afterPropertiesSet should have been invoked before start on " + this);
}
if (startInvoked) {
throw new IllegalStateException("Start already invoked on " + this);
}
startInvoked = true;
}
@Override
public boolean isRunning() {
return startInvoked && !stopInvoked;
}
@Override
public void stop() throws JmsException {
if (stopInvoked) {
@ -77,8 +74,28 @@ public class MessageListenerTestContainer
}
@Override
public void setupMessageListener(Object messageListener) {
public boolean isRunning() {
return startInvoked && !stopInvoked;
}
@Override
public int getPhase() {
return 0;
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable callback) {
stopInvoked = true;
callback.run();
}
@Override
public void setupMessageListener(Object messageListener) {
}
@Override
@ -93,10 +110,6 @@ public class MessageListenerTestContainer
@Override
public void afterPropertiesSet() {
if (!startInvoked) {
throw new IllegalStateException("Start should have been invoked before " +
"afterPropertiesSet on " + this);
}
initializationInvoked = true;
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -62,9 +62,7 @@ public class DefaultJmsActivationSpecFactoryTests extends TestCase {
Destination destination = new StubQueue();
DestinationResolver destinationResolver = mock(DestinationResolver.class);
given(destinationResolver.resolveDestinationName(null, "destinationname",
false)).willReturn(destination);
given(destinationResolver.resolveDestinationName(null, "destinationname", false)).willReturn(destination);
DefaultJmsActivationSpecFactory activationSpecFactory = new DefaultJmsActivationSpecFactory();
activationSpecFactory.setDestinationResolver(destinationResolver);