diff --git a/spring-jms/src/main/java/org/springframework/jms/annotation/EnableJms.java b/spring-jms/src/main/java/org/springframework/jms/annotation/EnableJms.java index f60fbbd42c4..3efe96c08c1 100644 --- a/spring-jms/src/main/java/org/springframework/jms/annotation/EnableJms.java +++ b/spring-jms/src/main/java/org/springframework/jms/annotation/EnableJms.java @@ -28,8 +28,8 @@ import org.springframework.context.annotation.Import; * Enable JMS listener annotated endpoints that are created under the cover * by a {@link org.springframework.jms.config.JmsListenerContainerFactory * JmsListenerContainerFactory}. To be used on - * @{@link org.springframework.context.annotation.Configuration Configuration} classes - * as follows: + * {@link org.springframework.context.annotation.Configuration Configuration} + * classes as follows: * *
  * @Configuration
@@ -52,8 +52,8 @@ import org.springframework.context.annotation.Import;
  * used in the sample above, provides the necessary configuration options that are supported by
  * the underlying {@link org.springframework.jms.listener.MessageListenerContainer MessageListenerContainer}.
  *
- * 

{@code @EnableJms} enables detection of @{@link JmsListener} annotations on - * any Spring-managed bean in the container. For example, given a class {@code MyService} + *

{@code @EnableJms} enables detection of {@link JmsListener} annotations on any + * Spring-managed bean in the container. For example, given a class {@code MyService}: * *

  * package com.acme.foo;
@@ -103,7 +103,7 @@ import org.springframework.context.annotation.Import;
  *
  * 

Annotated methods can use flexible signature; in particular, it is possible to use * the {@link org.springframework.messaging.Message Message} abstraction and related annotations, - * see @{@link JmsListener} Javadoc for more details. For instance, the following would + * see {@link JmsListener} Javadoc for more details. For instance, the following would * inject the content of the message and a a custom "myCounter" JMS header: * *

@@ -163,7 +163,7 @@ import org.springframework.context.annotation.Import;
  * are created and managed. The example below also demonstrates how to customize the
  * {@code JmsHandlerMethodFactory} to use with a custom {@link org.springframework.validation.Validator
  * Validator} so that payloads annotated with {@link org.springframework.validation.annotation.Validated
- * @Validated} are first validated against a custom {@code Validator}.
+ * Validated} are first validated against a custom {@code Validator}.
  *
  * 
  * @Configuration
diff --git a/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListener.java b/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListener.java
index bd82b1855c3..5649257b21f 100644
--- a/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListener.java
+++ b/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListener.java
@@ -79,7 +79,7 @@ public @interface JmsListener {
 	/**
 	 * The unique identifier of the container managing this endpoint.
 	 * 

if none is specified an auto-generated one is provided. - * @see org.springframework.jms.config.JmsListenerEndpointRegistry#getContainer(String) + * @see org.springframework.jms.config.JmsListenerEndpointRegistry#getListenerContainer(String) */ String id() default ""; diff --git a/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListenerAnnotationBeanPostProcessor.java b/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListenerAnnotationBeanPostProcessor.java index 9e1628b72e9..187f1fb5de6 100644 --- a/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListenerAnnotationBeanPostProcessor.java +++ b/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListenerAnnotationBeanPostProcessor.java @@ -43,21 +43,21 @@ import org.springframework.util.ReflectionUtils; import org.springframework.util.StringUtils; /** - * Bean post-processor that registers methods annotated with @{@link JmsListener} + * Bean post-processor that registers methods annotated with {@link JmsListener} * to be invoked by a JMS message listener container created under the cover * by a {@link org.springframework.jms.config.JmsListenerContainerFactory} according * to the parameters of the annotation. * - *

Annotated methods can use flexible arguments as defined by @{@link JmsListener}. + *

Annotated methods can use flexible arguments as defined by {@link JmsListener}. * *

This post-processor is automatically registered by Spring's - * {@code } XML element, and also by the @{@link EnableJms} + * {@code } XML element, and also by the {@link EnableJms} * annotation. * *

Auto-detect any {@link JmsListenerConfigurer} instances in the container, * allowing for customization of the registry to be used, the default container * factory or for fine-grained control over endpoints registration. See - * @{@link EnableJms} Javadoc for complete usage details. + * {@link EnableJms} Javadoc for complete usage details. * * @author Stephane Nicoll * @since 4.1 @@ -68,7 +68,6 @@ import org.springframework.util.StringUtils; * @see JmsListenerEndpointRegistry * @see org.springframework.jms.config.AbstractJmsListenerEndpoint * @see MethodJmsListenerEndpoint - * @see MessageListenerFactory */ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor, Ordered, ApplicationContextAware, ApplicationListener { @@ -78,9 +77,6 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor */ static final String DEFAULT_JMS_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "jmsListenerContainerFactory"; - private final AtomicInteger counter = new AtomicInteger(); - - private ApplicationContext applicationContext; private JmsListenerEndpointRegistry endpointRegistry; @@ -88,11 +84,16 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor private final JmsHandlerMethodFactoryAdapter jmsHandlerMethodFactory = new JmsHandlerMethodFactoryAdapter(); + private ApplicationContext applicationContext; + private final JmsListenerEndpointRegistrar registrar = new JmsListenerEndpointRegistrar(); + private final AtomicInteger counter = new AtomicInteger(); + + @Override - public void setApplicationContext(ApplicationContext applicationContext) { - this.applicationContext = applicationContext; + public int getOrder() { + return LOWEST_PRECEDENCE; } /** @@ -105,8 +106,7 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor /** * Set the name of the {@link JmsListenerContainerFactory} to use by default. - *

If none is specified, {@value #DEFAULT_JMS_LISTENER_CONTAINER_FACTORY_BEAN_NAME} - * is assumed to be defined. + *

If none is specified, "jmsListenerContainerFactory" is assumed to be defined. */ public void setContainerFactoryBeanName(String containerFactoryBeanName) { this.containerFactoryBeanName = containerFactoryBeanName; @@ -125,10 +125,11 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor } @Override - public int getOrder() { - return LOWEST_PRECEDENCE; + public void setApplicationContext(ApplicationContext applicationContext) { + this.applicationContext = applicationContext; } + @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; @@ -162,17 +163,17 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor catch (NoSuchMethodException ex) { throw new IllegalStateException(String.format( "@JmsListener method '%s' found on bean target class '%s', " + - "but not found in any interface(s) for bean JDK proxy. Either " + - "pull the method up to an interface or switch to subclass (CGLIB) " + - "proxies by setting proxy-target-class/proxyTargetClass " + - "attribute to 'true'", method.getName(), method.getDeclaringClass().getSimpleName())); + "but not found in any interface(s) for bean JDK proxy. Either " + + "pull the method up to an interface or switch to subclass (CGLIB) " + + "proxies by setting proxy-target-class/proxyTargetClass " + + "attribute to 'true'", method.getName(), method.getDeclaringClass().getSimpleName())); } } MethodJmsListenerEndpoint endpoint = new MethodJmsListenerEndpoint(); endpoint.setBean(bean); endpoint.setMethod(method); - endpoint.setJmsHandlerMethodFactory(jmsHandlerMethodFactory); + endpoint.setJmsHandlerMethodFactory(this.jmsHandlerMethodFactory); endpoint.setId(getEndpointId(jmsListener)); endpoint.setDestination(jmsListener.destination()); if (StringUtils.hasText(jmsListener.selector())) { @@ -189,12 +190,12 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor String containerFactoryBeanName = jmsListener.containerFactory(); if (StringUtils.hasText(containerFactoryBeanName)) { try { - factory = applicationContext.getBean(containerFactoryBeanName, JmsListenerContainerFactory.class); + factory = this.applicationContext.getBean(containerFactoryBeanName, JmsListenerContainerFactory.class); } - catch (NoSuchBeanDefinitionException e) { - throw new BeanInitializationException("Could not register jms listener endpoint on [" - + method + "], no " + JmsListenerContainerFactory.class.getSimpleName() + " with id '" - + containerFactoryBeanName + "' was found in the application context", e); + catch (NoSuchBeanDefinitionException ex) { + throw new BeanInitializationException("Could not register jms listener endpoint on [" + + method + "], no " + JmsListenerContainerFactory.class.getSimpleName() + " with id '" + + containerFactoryBeanName + "' was found in the application context", ex); } } @@ -214,22 +215,20 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor configurer.configureJmsListeners(registrar); } - registrar.setApplicationContext(this.applicationContext); + this.registrar.setApplicationContext(this.applicationContext); - if (registrar.getEndpointRegistry() == null) { - if (endpointRegistry == null) { - endpointRegistry = applicationContext - .getBean(AnnotationConfigUtils.JMS_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME, - JmsListenerEndpointRegistry.class); + if (this.registrar.getEndpointRegistry() == null) { + if (this.endpointRegistry == null) { + this.endpointRegistry = this.applicationContext.getBean( + AnnotationConfigUtils.JMS_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME, JmsListenerEndpointRegistry.class); } - registrar.setEndpointRegistry(endpointRegistry); + this.registrar.setEndpointRegistry(this.endpointRegistry); } if (this.containerFactoryBeanName != null) { - registrar.setContainerFactoryBeanName(this.containerFactoryBeanName); + this.registrar.setContainerFactoryBeanName(this.containerFactoryBeanName); } - // Set the custom handler method factory once resolved by the configurer JmsHandlerMethodFactory handlerMethodFactory = registrar.getJmsHandlerMethodFactory(); if (handlerMethodFactory != null) { @@ -238,10 +237,10 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor // Create all the listeners and starts them try { - registrar.afterPropertiesSet(); + this.registrar.afterPropertiesSet(); } - catch (Exception e) { - throw new BeanInitializationException(e.getMessage(), e); + catch (Exception ex) { + throw new BeanInitializationException("Failed to initialize JmsListenerEndpointRegistrar", ex); } } @@ -250,11 +249,11 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor return jmsListener.id(); } else { - return "org.springframework.jms.JmsListenerEndpointContainer#" - + counter.getAndIncrement(); + return "org.springframework.jms.JmsListenerEndpointContainer#" + counter.getAndIncrement(); } } + /** * An {@link JmsHandlerMethodFactory} adapter that offers a configurable underlying * instance to use. Useful if the factory to use is determined once the endpoints @@ -265,7 +264,7 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor private JmsHandlerMethodFactory jmsHandlerMethodFactory; - private void setJmsHandlerMethodFactory(JmsHandlerMethodFactory jmsHandlerMethodFactory) { + public void setJmsHandlerMethodFactory(JmsHandlerMethodFactory jmsHandlerMethodFactory) { this.jmsHandlerMethodFactory = jmsHandlerMethodFactory; } @@ -275,10 +274,10 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor } private JmsHandlerMethodFactory getJmsHandlerMethodFactory() { - if (jmsHandlerMethodFactory == null) { - jmsHandlerMethodFactory = createDefaultJmsHandlerMethodFactory(); + if (this.jmsHandlerMethodFactory == null) { + this.jmsHandlerMethodFactory = createDefaultJmsHandlerMethodFactory(); } - return jmsHandlerMethodFactory; + return this.jmsHandlerMethodFactory; } private JmsHandlerMethodFactory createDefaultJmsHandlerMethodFactory() { diff --git a/spring-jms/src/main/java/org/springframework/jms/config/AbstractJmsListenerContainerFactory.java b/spring-jms/src/main/java/org/springframework/jms/config/AbstractJmsListenerContainerFactory.java index 92df13e50a8..cfaf8de1c14 100644 --- a/spring-jms/src/main/java/org/springframework/jms/config/AbstractJmsListenerContainerFactory.java +++ b/spring-jms/src/main/java/org/springframework/jms/config/AbstractJmsListenerContainerFactory.java @@ -16,7 +16,6 @@ package org.springframework.jms.config; - import javax.jms.ConnectionFactory; import org.apache.commons.logging.Log; @@ -55,8 +54,13 @@ public abstract class AbstractJmsListenerContainerFactorySubclasses can inherit from this method to apply extra * configuration if necessary. */ protected void initializeContainer(C instance) { - } } diff --git a/spring-jms/src/main/java/org/springframework/jms/config/AbstractJmsListenerEndpoint.java b/spring-jms/src/main/java/org/springframework/jms/config/AbstractJmsListenerEndpoint.java index 7cf0909ee67..d244dd2fc13 100644 --- a/spring-jms/src/main/java/org/springframework/jms/config/AbstractJmsListenerEndpoint.java +++ b/spring-jms/src/main/java/org/springframework/jms/config/AbstractJmsListenerEndpoint.java @@ -112,60 +112,34 @@ public abstract class AbstractJmsListenerEndpoint implements JmsListenerEndpoint * Return the concurrency for the listener, if any. */ public String getConcurrency() { - return concurrency; + return this.concurrency; } + @Override - public void setupMessageContainer(MessageListenerContainer container) { - if (container instanceof AbstractMessageListenerContainer) { // JMS - setupJmsMessageContainer((AbstractMessageListenerContainer) container); - } - else if (container instanceof JmsMessageEndpointManager) { // JCA - setupJcaMessageContainer((JmsMessageEndpointManager) container); + public void setupListenerContainer(MessageListenerContainer listenerContainer) { + if (listenerContainer instanceof AbstractMessageListenerContainer) { + setupJmsListenerContainer((AbstractMessageListenerContainer) listenerContainer); } else { - throw new IllegalArgumentException("Could not configure endpoint with the specified container '" + - container + "' Only JMS (" + AbstractMessageListenerContainer.class.getName() + - " subclass) or JCA (" + JmsMessageEndpointManager.class.getName() + ") are supported."); + new JcaEndpointConfigurer().configureEndpoint(listenerContainer); } } - protected void setupJmsMessageContainer(AbstractMessageListenerContainer container) { + private void setupJmsListenerContainer(AbstractMessageListenerContainer listenerContainer) { if (getDestination() != null) { - container.setDestinationName(getDestination()); + listenerContainer.setDestinationName(getDestination()); } if (getSubscription() != null) { - container.setDurableSubscriptionName(getSubscription()); + listenerContainer.setSubscriptionName(getSubscription()); } if (getSelector() != null) { - container.setMessageSelector(getSelector()); + listenerContainer.setMessageSelector(getSelector()); } if (getConcurrency() != null) { - container.setConcurrency(getConcurrency()); + listenerContainer.setConcurrency(getConcurrency()); } - setupMessageListener(container); - } - - protected void setupJcaMessageContainer(JmsMessageEndpointManager container) { - JmsActivationSpecConfig activationSpecConfig = container.getActivationSpecConfig(); - if (activationSpecConfig == null) { - activationSpecConfig = new JmsActivationSpecConfig(); - container.setActivationSpecConfig(activationSpecConfig); - } - - if (getDestination() != null) { - activationSpecConfig.setDestinationName(getDestination()); - } - if (getSubscription() != null) { - activationSpecConfig.setDurableSubscriptionName(getSubscription()); - } - if (getSelector() != null) { - activationSpecConfig.setMessageSelector(getSelector()); - } - if (getConcurrency() != null) { - activationSpecConfig.setConcurrency(getConcurrency()); - } - setupMessageListener(container); + setupMessageListener(listenerContainer); } /** @@ -196,4 +170,43 @@ public abstract class AbstractJmsListenerEndpoint implements JmsListenerEndpoint return getEndpointDescription().toString(); } + + /** + * Inner class to avoid a hard dependency on the JCA API. + */ + private class JcaEndpointConfigurer { + + public void configureEndpoint(Object listenerContainer) { + if (listenerContainer instanceof JmsMessageEndpointManager) { + setupJcaMessageContainer((JmsMessageEndpointManager) listenerContainer); + } + else { + throw new IllegalArgumentException("Could not configure endpoint with the specified container '" + + listenerContainer + "' Only JMS (" + AbstractMessageListenerContainer.class.getName() + + " subclass) or JCA (" + JmsMessageEndpointManager.class.getName() + ") are supported."); + } + } + + private void setupJcaMessageContainer(JmsMessageEndpointManager container) { + JmsActivationSpecConfig activationSpecConfig = container.getActivationSpecConfig(); + if (activationSpecConfig == null) { + activationSpecConfig = new JmsActivationSpecConfig(); + container.setActivationSpecConfig(activationSpecConfig); + } + if (getDestination() != null) { + activationSpecConfig.setDestinationName(getDestination()); + } + if (getSubscription() != null) { + activationSpecConfig.setSubscriptionName(getSubscription()); + } + if (getSelector() != null) { + activationSpecConfig.setMessageSelector(getSelector()); + } + if (getConcurrency() != null) { + activationSpecConfig.setConcurrency(getConcurrency()); + } + setupMessageListener(container); + } + } + } diff --git a/spring-jms/src/main/java/org/springframework/jms/config/AbstractListenerContainerParser.java b/spring-jms/src/main/java/org/springframework/jms/config/AbstractListenerContainerParser.java index 9384de78ebe..e4608b0929e 100644 --- a/spring-jms/src/main/java/org/springframework/jms/config/AbstractListenerContainerParser.java +++ b/spring-jms/src/main/java/org/springframework/jms/config/AbstractListenerContainerParser.java @@ -74,6 +74,10 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser { protected static final String DESTINATION_TYPE_DURABLE_TOPIC = "durableTopic"; + protected static final String DESTINATION_TYPE_SHARED_TOPIC = "sharedTopic"; + + protected static final String DESTINATION_TYPE_SHARED_DURABLE_TOPIC = "sharedDurableTopic"; + protected static final String CLIENT_ID_ATTRIBUTE = "client-id"; protected static final String ACKNOWLEDGE_ATTRIBUTE = "acknowledge"; @@ -98,14 +102,21 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser { @Override public BeanDefinition parse(Element element, ParserContext parserContext) { CompositeComponentDefinition compositeDef = - new CompositeComponentDefinition(element.getTagName(), parserContext.extractSource(element)); + new CompositeComponentDefinition(element.getTagName(), parserContext.extractSource(element)); parserContext.pushContainingComponent(compositeDef); - // First parse the common configuration of the container - PropertyValues containerValues = parseProperties(element, parserContext); + PropertyValues commonProperties = parseCommonContainerProperties(element, parserContext); + PropertyValues specificProperties = parseSpecificContainerProperties(element, parserContext); - // Expose the factory if requested - parseContainerFactory(element, parserContext, containerValues); + String factoryId = element.getAttribute(FACTORY_ID_ATTRIBUTE); + if (StringUtils.hasText(factoryId)) { + RootBeanDefinition beanDefinition = createContainerFactory( + factoryId, element, parserContext, commonProperties, specificProperties); + if (beanDefinition != null) { + beanDefinition.setSource(parserContext.extractSource(element)); + parserContext.registerBeanComponent(new BeanComponentDefinition(beanDefinition, factoryId)); + } + } NodeList childNodes = element.getChildNodes(); for (int i = 0; i < childNodes.getLength(); i++) { @@ -113,9 +124,7 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser { if (child.getNodeType() == Node.ELEMENT_NODE) { String localName = parserContext.getDelegate().getLocalName(child); if (LISTENER_ELEMENT.equals(localName)) { - ListenerContainerParserContext context = new ListenerContainerParserContext( - element, (Element) child, containerValues, parserContext); - parseListener(context); + parseListener(element, (Element) child, parserContext, commonProperties, specificProperties); } } } @@ -124,45 +133,12 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser { return null; } - /** - * Parse the common properties for all listeners as defined by the specified - * container {@link Element}. - */ - protected abstract PropertyValues parseProperties(Element containerEle, ParserContext parserContext); - - /** - * Create the {@link BeanDefinition} for the container factory using the specified - * shared property values. - */ - protected abstract RootBeanDefinition createContainerFactory(String factoryId, - Element containerElement, PropertyValues propertyValues); - - /** - * Create the container {@link BeanDefinition} for the specified context. - */ - protected abstract BeanDefinition createContainer(ListenerContainerParserContext context); - - - private void parseContainerFactory(Element element, ParserContext parserContext, PropertyValues propertyValues) { - String factoryId = element.getAttribute(FACTORY_ID_ATTRIBUTE); - if (StringUtils.hasText(factoryId)) { - RootBeanDefinition beanDefinition = createContainerFactory(factoryId, element, propertyValues); - if (beanDefinition != null) { - beanDefinition.setSource(parserContext.extractSource(element)); - parserContext.registerBeanComponent(new BeanComponentDefinition(beanDefinition, factoryId)); - } - } - } - - private void parseListener(ListenerContainerParserContext context) { - ParserContext parserContext = context.getParserContext(); - PropertyValues containerValues = context.getContainerValues(); - Element listenerEle = context.getListenerElement(); + private void parseListener(Element containerEle, Element listenerEle, ParserContext parserContext, + PropertyValues commonContainerProperties, PropertyValues specificContainerProperties) { RootBeanDefinition listenerDef = new RootBeanDefinition(); listenerDef.setSource(parserContext.extractSource(listenerEle)); - - BeanDefinition containerDef = createContainer(context); + listenerDef.setBeanClassName("org.springframework.jms.listener.adapter.MessageListenerAdapter"); String ref = listenerEle.getAttribute(REF_ATTRIBUTE); if (!StringUtils.hasText(ref)) { @@ -183,15 +159,18 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser { } listenerDef.getPropertyValues().add("defaultListenerMethod", method); - PropertyValue messageConverterPv = getMessageConverter(containerValues); + PropertyValue messageConverterPv = commonContainerProperties.getPropertyValue("messageConverter"); if (messageConverterPv != null) { listenerDef.getPropertyValues().addPropertyValue(messageConverterPv); } + BeanDefinition containerDef = createContainer( + containerEle, listenerEle, parserContext, commonContainerProperties, specificContainerProperties); + containerDef.getPropertyValues().add("messageListener", listenerDef); if (listenerEle.hasAttribute(RESPONSE_DESTINATION_ATTRIBUTE)) { String responseDestination = listenerEle.getAttribute(RESPONSE_DESTINATION_ATTRIBUTE); - boolean pubSubDomain = indicatesPubSub(containerValues); + Boolean pubSubDomain = (Boolean) commonContainerProperties.getPropertyValue("pubSubDomain").getValue(); listenerDef.getPropertyValues().add( pubSubDomain ? "defaultResponseTopicName" : "defaultResponseQueueName", responseDestination); if (containerDef.getPropertyValues().contains("destinationResolver")) { @@ -200,9 +179,6 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser { } } - listenerDef.setBeanClassName("org.springframework.jms.listener.adapter.MessageListenerAdapter"); - - containerDef.getPropertyValues().add("messageListener", listenerDef); String containerBeanName = listenerEle.getAttribute(ID_ATTRIBUTE); // If no bean id is given auto generate one using the ReaderContext's BeanNameGenerator @@ -214,21 +190,13 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser { parserContext.registerBeanComponent(new BeanComponentDefinition(containerDef, containerBeanName)); } - protected boolean indicatesPubSub(PropertyValues propertyValues) { - return false; - } - - protected PropertyValue getMessageConverter(PropertyValues containerValues) { - return containerValues.getPropertyValue("messageConverter"); - } - - protected void parseListenerConfiguration(Element ele, ParserContext parserContext, BeanDefinition configDef) { + protected void parseListenerConfiguration(Element ele, ParserContext parserContext, MutablePropertyValues configValues) { String destination = ele.getAttribute(DESTINATION_ATTRIBUTE); if (!StringUtils.hasText(destination)) { parserContext.getReaderContext().error( "Listener 'destination' attribute contains empty value.", ele); } - configDef.getPropertyValues().add("destinationName", destination); + configValues.add("destinationName", destination); if (ele.hasAttribute(SUBSCRIPTION_ATTRIBUTE)) { String subscription = ele.getAttribute(SUBSCRIPTION_ATTRIBUTE); @@ -236,7 +204,7 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser { parserContext.getReaderContext().error( "Listener 'subscription' attribute contains empty value.", ele); } - configDef.getPropertyValues().add("durableSubscriptionName", subscription); + configValues.add("subscriptionName", subscription); } if (ele.hasAttribute(SELECTOR_ATTRIBUTE)) { @@ -245,7 +213,7 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser { parserContext.getReaderContext().error( "Listener 'selector' attribute contains empty value.", ele); } - configDef.getPropertyValues().add("messageSelector", selector); + configValues.add("messageSelector", selector); } if (ele.hasAttribute(CONCURRENCY_ATTRIBUTE)) { @@ -254,17 +222,27 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser { parserContext.getReaderContext().error( "Listener 'concurrency' attribute contains empty value.", ele); } - configDef.getPropertyValues().add("concurrency", concurrency); + configValues.add("concurrency", concurrency); } } - protected PropertyValues parseCommonContainerProperties(Element ele, ParserContext parserContext) { - MutablePropertyValues propertyValues = new MutablePropertyValues(); + protected MutablePropertyValues parseCommonContainerProperties(Element containerEle, ParserContext parserContext) { + MutablePropertyValues properties = new MutablePropertyValues(); - String destinationType = ele.getAttribute(DESTINATION_TYPE_ATTRIBUTE); + String destinationType = containerEle.getAttribute(DESTINATION_TYPE_ATTRIBUTE); boolean pubSubDomain = false; boolean subscriptionDurable = false; - if (DESTINATION_TYPE_DURABLE_TOPIC.equals(destinationType)) { + boolean subscriptionShared = false; + if (DESTINATION_TYPE_SHARED_DURABLE_TOPIC.equals(destinationType)) { + pubSubDomain = true; + subscriptionDurable = true; + subscriptionShared = true; + } + else if (DESTINATION_TYPE_SHARED_TOPIC.equals(destinationType)) { + pubSubDomain = true; + subscriptionShared = true; + } + else if (DESTINATION_TYPE_DURABLE_TOPIC.equals(destinationType)) { pubSubDomain = true; subscriptionDurable = true; } @@ -275,23 +253,57 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser { // the default: queue } else { - parserContext.getReaderContext().error("Invalid listener container 'destination-type': " + - "only \"queue\", \"topic\" and \"durableTopic\" supported.", ele); + parserContext.getReaderContext().error("Invalid listener container 'destination-type': only " + + "\"queue\", \"topic\", \"durableTopic\", \"sharedTopic\", \"sharedDurableTopic\" supported.", containerEle); } - propertyValues.add("pubSubDomain", pubSubDomain); - propertyValues.add("subscriptionDurable", subscriptionDurable); + properties.add("pubSubDomain", pubSubDomain); + properties.add("subscriptionDurable", subscriptionDurable); + properties.add("subscriptionShared", subscriptionShared); - if (ele.hasAttribute(CLIENT_ID_ATTRIBUTE)) { - String clientId = ele.getAttribute(CLIENT_ID_ATTRIBUTE); + if (containerEle.hasAttribute(CLIENT_ID_ATTRIBUTE)) { + String clientId = containerEle.getAttribute(CLIENT_ID_ATTRIBUTE); if (!StringUtils.hasText(clientId)) { parserContext.getReaderContext().error( - "Listener 'client-id' attribute contains empty value.", ele); + "Listener 'client-id' attribute contains empty value.", containerEle); } - propertyValues.add("clientId", clientId); + properties.add("clientId", clientId); } - return propertyValues; + + if (containerEle.hasAttribute(MESSAGE_CONVERTER_ATTRIBUTE)) { + String messageConverter = containerEle.getAttribute(MESSAGE_CONVERTER_ATTRIBUTE); + if (!StringUtils.hasText(messageConverter)) { + parserContext.getReaderContext().error( + "listener container 'message-converter' attribute contains empty value.", containerEle); + } + else { + properties.add("messageConverter", new RuntimeBeanReference(messageConverter)); + } + } + + return properties; } + + /** + * Parse the common properties for all listeners as defined by the specified + * container {@link Element}. + */ + protected abstract MutablePropertyValues parseSpecificContainerProperties(Element containerEle, ParserContext parserContext); + + /** + * Create the {@link BeanDefinition} for the container factory using the specified + * shared property values. + */ + protected abstract RootBeanDefinition createContainerFactory(String factoryId, Element containerEle, ParserContext parserContext, + PropertyValues commonContainerProperties, PropertyValues specificContainerProperties); + + /** + * Create the container {@link BeanDefinition} for the specified context. + */ + protected abstract RootBeanDefinition createContainer(Element containerEle, Element listenerEle, ParserContext parserContext, + PropertyValues commonContainerProperties, PropertyValues specificContainerProperties); + + protected Integer parseAcknowledgeMode(Element ele, ParserContext parserContext) { String acknowledge = ele.getAttribute(ACKNOWLEDGE_ATTRIBUTE); if (StringUtils.hasText(acknowledge)) { @@ -316,45 +328,4 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser { } } - protected boolean indicatesPubSubConfig(PropertyValues configuration) { - return (Boolean) configuration.getPropertyValue("pubSubDomain").getValue(); - } - - - protected static class ListenerContainerParserContext { - private final Element containerElement; - private final Element listenerElement; - private final PropertyValues containerValues; - private final ParserContext parserContext; - - public ListenerContainerParserContext(Element containerElement, Element listenerElement, - PropertyValues containerValues, ParserContext parserContext) { - this.containerElement = containerElement; - this.listenerElement = listenerElement; - this.containerValues = containerValues; - this.parserContext = parserContext; - } - - public Element getContainerElement() { - return containerElement; - } - - public Element getListenerElement() { - return listenerElement; - } - - public PropertyValues getContainerValues() { - return containerValues; - } - - public ParserContext getParserContext() { - return parserContext; - } - - public Object getSource() { - return parserContext.extractSource(containerElement); - } - - } - } diff --git a/spring-jms/src/main/java/org/springframework/jms/config/AnnotationDrivenJmsBeanDefinitionParser.java b/spring-jms/src/main/java/org/springframework/jms/config/AnnotationDrivenJmsBeanDefinitionParser.java index 10c95094f03..2efa275c8fb 100644 --- a/spring-jms/src/main/java/org/springframework/jms/config/AnnotationDrivenJmsBeanDefinitionParser.java +++ b/spring-jms/src/main/java/org/springframework/jms/config/AnnotationDrivenJmsBeanDefinitionParser.java @@ -18,7 +18,6 @@ package org.springframework.jms.config; import org.w3c.dom.Element; -import org.springframework.beans.factory.config.AutowireCapableBeanFactory; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.BeanDefinitionHolder; import org.springframework.beans.factory.parsing.BeanComponentDefinition; @@ -36,7 +35,7 @@ import org.springframework.util.StringUtils; * @author Stephane Nicoll * @since 4.1 */ -final class AnnotationDrivenJmsBeanDefinitionParser implements BeanDefinitionParser { +class AnnotationDrivenJmsBeanDefinitionParser implements BeanDefinitionParser { @Override public BeanDefinition parse(Element element, ParserContext parserContext) { @@ -89,7 +88,6 @@ final class AnnotationDrivenJmsBeanDefinitionParser implements BeanDefinitionPar BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition( "org.springframework.jms.config.JmsListenerEndpointRegistry"); builder.getRawBeanDefinition().setSource(source); - builder.setAutowireMode(AutowireCapableBeanFactory.AUTOWIRE_BY_TYPE); registerInfrastructureBean(parserContext, builder, AnnotationConfigUtils.JMS_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME); } diff --git a/spring-jms/src/main/java/org/springframework/jms/config/DefaultJcaListenerContainerFactory.java b/spring-jms/src/main/java/org/springframework/jms/config/DefaultJcaListenerContainerFactory.java index ec597c1d86d..e730719a0be 100644 --- a/spring-jms/src/main/java/org/springframework/jms/config/DefaultJcaListenerContainerFactory.java +++ b/spring-jms/src/main/java/org/springframework/jms/config/DefaultJcaListenerContainerFactory.java @@ -25,12 +25,13 @@ import org.springframework.jms.support.destination.DestinationResolver; /** * A {@link JmsListenerContainerFactory} implementation to build a - * JCA {@link JmsMessageEndpointManager}. + * JCA-based {@link JmsMessageEndpointManager}. * * @author Stephane Nicoll * @since 4.1 */ -public class DefaultJcaListenerContainerFactory implements JmsListenerContainerFactory { +public class DefaultJcaListenerContainerFactory extends JmsActivationSpecConfig + implements JmsListenerContainerFactory { private ResourceAdapter resourceAdapter; @@ -40,7 +41,8 @@ public class DefaultJcaListenerContainerFactory implements JmsListenerContainerF private Object transactionManager; - private JmsActivationSpecConfig activationSpecConfig; + private Integer phase; + /** * @see JmsMessageEndpointManager#setResourceAdapter(ResourceAdapter) @@ -71,28 +73,15 @@ public class DefaultJcaListenerContainerFactory implements JmsListenerContainerF } /** - * @see JmsMessageEndpointManager#setActivationSpecConfig(JmsActivationSpecConfig) + * @see JmsMessageEndpointManager#setPhase(int) */ - public void setActivationSpecConfig(JmsActivationSpecConfig activationSpecConfig) { - this.activationSpecConfig = activationSpecConfig; + public void setPhase(int phase) { + this.phase = phase; } - /** - * Return the current {@link JmsActivationSpecConfig}. - */ - public JmsActivationSpecConfig getActivationSpecConfig() { - return activationSpecConfig; - } - - /** - * Create an empty container instance. - */ - protected JmsMessageEndpointManager createContainerInstance() { - return new JmsMessageEndpointManager(); - } @Override - public JmsMessageEndpointManager createMessageListenerContainer(JmsListenerEndpoint endpoint) { + public JmsMessageEndpointManager createListenerContainer(JmsListenerEndpoint endpoint) { if (this.destinationResolver != null && this.activationSpecFactory != null) { throw new IllegalStateException("Specify either 'activationSpecFactory' or " + "'destinationResolver', not both. If you define a dedicated JmsActivationSpecFactory bean, " + @@ -113,13 +102,21 @@ public class DefaultJcaListenerContainerFactory implements JmsListenerContainerF if (this.transactionManager != null) { instance.setTransactionManager(this.transactionManager); } - if (this.activationSpecConfig != null) { - instance.setActivationSpecConfig(this.activationSpecConfig); + if (this.phase != null) { + instance.setPhase(this.phase); } - endpoint.setupMessageContainer(instance); + instance.setActivationSpecConfig(this); + endpoint.setupListenerContainer(instance); return instance; } + /** + * Create an empty container instance. + */ + protected JmsMessageEndpointManager createContainerInstance() { + return new JmsMessageEndpointManager(); + } + } diff --git a/spring-jms/src/main/java/org/springframework/jms/config/DefaultJmsListenerContainerFactory.java b/spring-jms/src/main/java/org/springframework/jms/config/DefaultJmsListenerContainerFactory.java index 7118f88feb8..26959cc321e 100644 --- a/spring-jms/src/main/java/org/springframework/jms/config/DefaultJmsListenerContainerFactory.java +++ b/spring-jms/src/main/java/org/springframework/jms/config/DefaultJmsListenerContainerFactory.java @@ -23,12 +23,11 @@ import org.springframework.transaction.PlatformTransactionManager; import org.springframework.util.backoff.BackOff; /** - * A {@link JmsListenerContainerFactory} implementation to build regular + * A {@link JmsListenerContainerFactory} implementation to build a regular * {@link DefaultMessageListenerContainer}. * - *

This should be the default for most users and a good transition - * paths for those that are used to build such container definition - * manually. + *

This should be the default for most users and a good transition paths + * for those that are used to build such container definition manually. * * @author Stephane Nicoll * @since 4.1 diff --git a/spring-jms/src/main/java/org/springframework/jms/config/JcaListenerContainerParser.java b/spring-jms/src/main/java/org/springframework/jms/config/JcaListenerContainerParser.java index c46a8d3e04b..03feb01e83e 100644 --- a/spring-jms/src/main/java/org/springframework/jms/config/JcaListenerContainerParser.java +++ b/spring-jms/src/main/java/org/springframework/jms/config/JcaListenerContainerParser.java @@ -19,9 +19,7 @@ package org.springframework.jms.config; import org.w3c.dom.Element; import org.springframework.beans.MutablePropertyValues; -import org.springframework.beans.PropertyValue; import org.springframework.beans.PropertyValues; -import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.RuntimeBeanReference; import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.beans.factory.xml.ParserContext; @@ -42,83 +40,64 @@ class JcaListenerContainerParser extends AbstractListenerContainerParser { @Override - protected PropertyValues parseProperties(Element containerEle, ParserContext parserContext) { - final MutablePropertyValues properties = new MutablePropertyValues(); - PropertyValues containerValues = parseContainerProperties(containerEle, parserContext); + protected RootBeanDefinition createContainerFactory(String factoryId, Element containerEle, ParserContext parserContext, + PropertyValues commonContainerProperties, PropertyValues specificContainerProperties) { - // Common values are added to the activationSpecConfig - PropertyValues commonValues = parseCommonContainerProperties(containerEle, parserContext); - BeanDefinition beanDefinition = getActivationSpecConfigBeanDefinition(containerValues); - beanDefinition.getPropertyValues().addPropertyValues(commonValues); + RootBeanDefinition factoryDef = new RootBeanDefinition(); + factoryDef.setBeanClassName("org.springframework.jms.config.DefaultJcaListenerContainerFactory"); - properties.addPropertyValues(containerValues); - return properties; + factoryDef.getPropertyValues().addPropertyValues(commonContainerProperties); + factoryDef.getPropertyValues().addPropertyValues(specificContainerProperties); + + return factoryDef; } @Override - protected RootBeanDefinition createContainerFactory(String factoryId, - Element containerElement, PropertyValues propertyValues) { - RootBeanDefinition beanDefinition = new RootBeanDefinition(); - beanDefinition.setBeanClassName("org.springframework.jms.config.DefaultJcaListenerContainerFactory"); - beanDefinition.getPropertyValues().addPropertyValues(propertyValues); - return beanDefinition; - } + protected RootBeanDefinition createContainer(Element containerEle, Element listenerEle, ParserContext parserContext, + PropertyValues commonContainerProperties, PropertyValues specificContainerProperties) { - @Override - protected BeanDefinition createContainer(ListenerContainerParserContext context) { RootBeanDefinition containerDef = new RootBeanDefinition(); - containerDef.setSource(context.getSource()); + containerDef.setSource(parserContext.extractSource(containerEle)); containerDef.setBeanClassName("org.springframework.jms.listener.endpoint.JmsMessageEndpointManager"); + containerDef.getPropertyValues().addPropertyValues(specificContainerProperties); - applyContainerValues(context, containerDef); + RootBeanDefinition configDef = new RootBeanDefinition(); + configDef.setSource(parserContext.extractSource(containerEle)); + configDef.setBeanClassName("org.springframework.jms.listener.endpoint.JmsActivationSpecConfig"); + configDef.getPropertyValues().addPropertyValues(commonContainerProperties); + parseListenerConfiguration(listenerEle, parserContext, configDef.getPropertyValues()); - - BeanDefinition activationSpec = getActivationSpecConfigBeanDefinition(containerDef.getPropertyValues()); - parseListenerConfiguration(context.getListenerElement(), context.getParserContext(), activationSpec); - - String phase = context.getContainerElement().getAttribute(PHASE_ATTRIBUTE); - if (StringUtils.hasText(phase)) { - containerDef.getPropertyValues().add("phase", phase); - } + containerDef.getPropertyValues().add("activationSpecConfig", configDef); return containerDef; } - /** - * The property values provided by the factory element contains a mutable property (the - * activation spec config). To avoid changing the bean definition from the parent, a clone - * bean definition is created for the container being configured. - */ - private void applyContainerValues(ListenerContainerParserContext context, RootBeanDefinition containerDef) { - // Apply settings from the container - containerDef.getPropertyValues().addPropertyValues(context.getContainerValues()); + @Override + protected MutablePropertyValues parseCommonContainerProperties(Element containerEle, ParserContext parserContext) { + MutablePropertyValues properties = super.parseCommonContainerProperties(containerEle, parserContext); - // Clone the activationSpecConfig property value as it is mutable - PropertyValue pv = containerDef.getPropertyValues().getPropertyValue("activationSpecConfig"); - RootBeanDefinition activationSpecConfig = new RootBeanDefinition((RootBeanDefinition) pv.getValue()); - containerDef.getPropertyValues().add("activationSpecConfig", activationSpecConfig); + Integer acknowledgeMode = parseAcknowledgeMode(containerEle, parserContext); + if (acknowledgeMode != null) { + properties.add("acknowledgeMode", acknowledgeMode); + } + + String concurrency = containerEle.getAttribute(CONCURRENCY_ATTRIBUTE); + if (StringUtils.hasText(concurrency)) { + properties.add("concurrency", concurrency); + } + + String prefetch = containerEle.getAttribute(PREFETCH_ATTRIBUTE); + if (StringUtils.hasText(prefetch)) { + properties.add("prefetchSize", new Integer(prefetch)); + } + + return properties; } @Override - protected boolean indicatesPubSub(PropertyValues propertyValues) { - BeanDefinition configDef = getActivationSpecConfigBeanDefinition(propertyValues); - return indicatesPubSubConfig(configDef.getPropertyValues()); - } + protected MutablePropertyValues parseSpecificContainerProperties(Element containerEle, ParserContext parserContext) { + MutablePropertyValues properties = new MutablePropertyValues(); - @Override - protected PropertyValue getMessageConverter(PropertyValues containerValues) { - BeanDefinition configDef = getActivationSpecConfigBeanDefinition(containerValues); - return super.getMessageConverter(configDef.getPropertyValues()); - } - - private BeanDefinition getActivationSpecConfigBeanDefinition(PropertyValues containerValues) { - PropertyValue activationSpecConfig = containerValues.getPropertyValue("activationSpecConfig"); - return (BeanDefinition) activationSpecConfig.getValue(); - } - - private PropertyValues parseContainerProperties(Element containerEle, - ParserContext parserContext) { - MutablePropertyValues propertyValues = new MutablePropertyValues(); if (containerEle.hasAttribute(RESOURCE_ADAPTER_ATTRIBUTE)) { String resourceAdapterBeanName = containerEle.getAttribute(RESOURCE_ADAPTER_ATTRIBUTE); if (!StringUtils.hasText(resourceAdapterBeanName)) { @@ -126,8 +105,7 @@ class JcaListenerContainerParser extends AbstractListenerContainerParser { "Listener container 'resource-adapter' attribute contains empty value.", containerEle); } else { - propertyValues.add("resourceAdapter", - new RuntimeBeanReference(resourceAdapterBeanName)); + properties.add("resourceAdapter", new RuntimeBeanReference(resourceAdapterBeanName)); } } @@ -139,54 +117,23 @@ class JcaListenerContainerParser extends AbstractListenerContainerParser { "'destination-resolver', not both. If you define a dedicated JmsActivationSpecFactory bean, " + "specify the custom DestinationResolver there (if possible).", containerEle); } - propertyValues.add("activationSpecFactory", - new RuntimeBeanReference(activationSpecFactoryBeanName)); + properties.add("activationSpecFactory", new RuntimeBeanReference(activationSpecFactoryBeanName)); } if (StringUtils.hasText(destinationResolverBeanName)) { - propertyValues.add("destinationResolver", - new RuntimeBeanReference(destinationResolverBeanName)); + properties.add("destinationResolver", new RuntimeBeanReference(destinationResolverBeanName)); } String transactionManagerBeanName = containerEle.getAttribute(TRANSACTION_MANAGER_ATTRIBUTE); if (StringUtils.hasText(transactionManagerBeanName)) { - propertyValues.add("transactionManager", - new RuntimeBeanReference(transactionManagerBeanName)); + properties.add("transactionManager", new RuntimeBeanReference(transactionManagerBeanName)); } - RootBeanDefinition configDef = new RootBeanDefinition(); - configDef.setSource(parserContext.extractSource(configDef)); - configDef.setBeanClassName("org.springframework.jms.listener.endpoint.JmsActivationSpecConfig"); - - - Integer acknowledgeMode = parseAcknowledgeMode(containerEle, parserContext); - if (acknowledgeMode != null) { - configDef.getPropertyValues().add("acknowledgeMode", acknowledgeMode); - } - String concurrency = containerEle.getAttribute(CONCURRENCY_ATTRIBUTE); - if (StringUtils.hasText(concurrency)) { - configDef.getPropertyValues().add("concurrency", concurrency); + String phase = containerEle.getAttribute(PHASE_ATTRIBUTE); + if (StringUtils.hasText(phase)) { + properties.add("phase", phase); } - String prefetch = containerEle.getAttribute(PREFETCH_ATTRIBUTE); - if (StringUtils.hasText(prefetch)) { - configDef.getPropertyValues().add("prefetchSize", new Integer(prefetch)); - } - - if (containerEle.hasAttribute(MESSAGE_CONVERTER_ATTRIBUTE)) { - String messageConverter = containerEle.getAttribute(MESSAGE_CONVERTER_ATTRIBUTE); - if (!StringUtils.hasText(messageConverter)) { - parserContext.getReaderContext().error( - "listener container 'message-converter' attribute contains empty value.", containerEle); - } - else { - configDef.getPropertyValues().add("messageConverter", - new RuntimeBeanReference(messageConverter)); - } - } - - propertyValues.add("activationSpecConfig", configDef); - - return propertyValues; + return properties; } } diff --git a/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerContainerFactory.java b/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerContainerFactory.java index ed21f17f9a3..5bdd4695115 100644 --- a/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerContainerFactory.java +++ b/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerContainerFactory.java @@ -33,6 +33,6 @@ public interface JmsListenerContainerFactory * @param endpoint the endpoint to configure * @return the created container */ - C createMessageListenerContainer(JmsListenerEndpoint endpoint); + C createListenerContainer(JmsListenerEndpoint endpoint); } diff --git a/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerContainerParser.java b/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerContainerParser.java index 38eacd25846..e2b45004117 100644 --- a/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerContainerParser.java +++ b/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerContainerParser.java @@ -22,7 +22,6 @@ import org.w3c.dom.Element; import org.springframework.beans.MutablePropertyValues; import org.springframework.beans.PropertyValues; -import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.RuntimeBeanReference; import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.beans.factory.xml.ParserContext; @@ -57,43 +56,39 @@ class JmsListenerContainerParser extends AbstractListenerContainerParser { private static final String BACK_OFF_ATTRIBUTE = "back-off"; - protected PropertyValues parseProperties(Element containerEle, ParserContext parserContext) { - final MutablePropertyValues properties = new MutablePropertyValues(); - PropertyValues commonValues = parseCommonContainerProperties(containerEle, parserContext); - PropertyValues containerValues = parseContainerProperties(containerEle, - parserContext, isSimpleContainer(containerEle)); - properties.addPropertyValues(commonValues); - properties.addPropertyValues(containerValues); - return properties; - } - @Override - protected RootBeanDefinition createContainerFactory(String factoryId, Element containerEle, PropertyValues propertyValues) { - RootBeanDefinition beanDefinition = new RootBeanDefinition(); + protected RootBeanDefinition createContainerFactory(String factoryId, Element containerEle, ParserContext parserContext, + PropertyValues commonContainerProperties, PropertyValues specificContainerProperties) { + + RootBeanDefinition factoryDef = new RootBeanDefinition(); + String containerType = containerEle.getAttribute(CONTAINER_TYPE_ATTRIBUTE); String containerClass = containerEle.getAttribute(CONTAINER_CLASS_ATTRIBUTE); if (!"".equals(containerClass)) { return null; // Not supported } else if ("".equals(containerType) || containerType.startsWith("default")) { - beanDefinition.setBeanClassName("org.springframework.jms.config.DefaultJmsListenerContainerFactory"); + factoryDef.setBeanClassName("org.springframework.jms.config.DefaultJmsListenerContainerFactory"); } else if (containerType.startsWith("simple")) { - beanDefinition.setBeanClassName("org.springframework.jms.config.SimpleJmsListenerContainerFactory"); + factoryDef.setBeanClassName("org.springframework.jms.config.SimpleJmsListenerContainerFactory"); } - beanDefinition.getPropertyValues().addPropertyValues(propertyValues); - return beanDefinition; + + factoryDef.getPropertyValues().addPropertyValues(commonContainerProperties); + factoryDef.getPropertyValues().addPropertyValues(specificContainerProperties); + + return factoryDef; } @Override - protected BeanDefinition createContainer(ListenerContainerParserContext context) { + protected RootBeanDefinition createContainer(Element containerEle, Element listenerEle, ParserContext parserContext, + PropertyValues commonContainerProperties, PropertyValues specificContainerProperties) { + RootBeanDefinition containerDef = new RootBeanDefinition(); - containerDef.setSource(context.getSource()); + containerDef.setSource(parserContext.extractSource(containerEle)); + containerDef.getPropertyValues().addPropertyValues(commonContainerProperties); + containerDef.getPropertyValues().addPropertyValues(specificContainerProperties); - // Set all container values - containerDef.getPropertyValues().addPropertyValues(context.getContainerValues()); - - Element containerEle = context.getContainerElement(); String containerType = containerEle.getAttribute(CONTAINER_TYPE_ATTRIBUTE); String containerClass = containerEle.getAttribute(CONTAINER_CLASS_ATTRIBUTE); if (!"".equals(containerClass)) { @@ -106,29 +101,22 @@ class JmsListenerContainerParser extends AbstractListenerContainerParser { containerDef.setBeanClassName("org.springframework.jms.listener.SimpleMessageListenerContainer"); } else { - context.getParserContext().getReaderContext().error( + parserContext.getReaderContext().error( "Invalid 'container-type' attribute: only \"default\" and \"simple\" supported.", containerEle); } - String phase = containerEle.getAttribute(PHASE_ATTRIBUTE); - if (StringUtils.hasText(phase)) { - containerDef.getPropertyValues().add("phase", phase); - } - // Parse listener specific settings - parseListenerConfiguration(context.getListenerElement(), context.getParserContext(), containerDef); + parseListenerConfiguration(listenerEle, parserContext, containerDef.getPropertyValues()); return containerDef; } @Override - protected boolean indicatesPubSub(PropertyValues propertyValues) { - return indicatesPubSubConfig(propertyValues); - } + protected MutablePropertyValues parseSpecificContainerProperties(Element containerEle, ParserContext parserContext) { + MutablePropertyValues properties = new MutablePropertyValues(); + + boolean isSimpleContainer = containerEle.getAttribute(CONTAINER_TYPE_ATTRIBUTE).startsWith("simple"); - private PropertyValues parseContainerProperties(Element containerEle, - ParserContext parserContext, boolean isSimpleContainer) { - MutablePropertyValues propertyValues = new MutablePropertyValues(); String connectionFactoryBeanName = "connectionFactory"; if (containerEle.hasAttribute(CONNECTION_FACTORY_ATTRIBUTE)) { connectionFactoryBeanName = containerEle.getAttribute(CONNECTION_FACTORY_ATTRIBUTE); @@ -138,38 +126,22 @@ class JmsListenerContainerParser extends AbstractListenerContainerParser { } } if (StringUtils.hasText(connectionFactoryBeanName)) { - propertyValues.add("connectionFactory", - new RuntimeBeanReference(connectionFactoryBeanName)); + properties.add("connectionFactory", new RuntimeBeanReference(connectionFactoryBeanName)); } String taskExecutorBeanName = containerEle.getAttribute(TASK_EXECUTOR_ATTRIBUTE); if (StringUtils.hasText(taskExecutorBeanName)) { - propertyValues.add("taskExecutor", - new RuntimeBeanReference(taskExecutorBeanName)); + properties.add("taskExecutor", new RuntimeBeanReference(taskExecutorBeanName)); } String errorHandlerBeanName = containerEle.getAttribute(ERROR_HANDLER_ATTRIBUTE); if (StringUtils.hasText(errorHandlerBeanName)) { - propertyValues.add("errorHandler", - new RuntimeBeanReference(errorHandlerBeanName)); - } - - if (containerEle.hasAttribute(MESSAGE_CONVERTER_ATTRIBUTE)) { - String messageConverter = containerEle.getAttribute(MESSAGE_CONVERTER_ATTRIBUTE); - if (!StringUtils.hasText(messageConverter)) { - parserContext.getReaderContext().error( - "listener container 'message-converter' attribute contains empty value.", containerEle); - } - else { - propertyValues.add("messageConverter", - new RuntimeBeanReference(messageConverter)); - } + properties.add("errorHandler", new RuntimeBeanReference(errorHandlerBeanName)); } String destinationResolverBeanName = containerEle.getAttribute(DESTINATION_RESOLVER_ATTRIBUTE); if (StringUtils.hasText(destinationResolverBeanName)) { - propertyValues.add("destinationResolver", - new RuntimeBeanReference(destinationResolverBeanName)); + properties.add("destinationResolver", new RuntimeBeanReference(destinationResolverBeanName)); } String cache = containerEle.getAttribute(CACHE_ATTRIBUTE); @@ -182,17 +154,17 @@ class JmsListenerContainerParser extends AbstractListenerContainerParser { } } else { - propertyValues.add("cacheLevelName", "CACHE_" + cache.toUpperCase()); + properties.add("cacheLevelName", "CACHE_" + cache.toUpperCase()); } } Integer acknowledgeMode = parseAcknowledgeMode(containerEle, parserContext); if (acknowledgeMode != null) { if (acknowledgeMode == Session.SESSION_TRANSACTED) { - propertyValues.add("sessionTransacted", Boolean.TRUE); + properties.add("sessionTransacted", Boolean.TRUE); } else { - propertyValues.add("sessionAcknowledgeMode", acknowledgeMode); + properties.add("sessionAcknowledgeMode", acknowledgeMode); } } @@ -203,51 +175,50 @@ class JmsListenerContainerParser extends AbstractListenerContainerParser { "'transaction-manager' attribute not supported for listener container of type \"simple\".", containerEle); } else { - propertyValues.add("transactionManager", - new RuntimeBeanReference(transactionManagerBeanName)); + properties.add("transactionManager", new RuntimeBeanReference(transactionManagerBeanName)); } } String concurrency = containerEle.getAttribute(CONCURRENCY_ATTRIBUTE); if (StringUtils.hasText(concurrency)) { - propertyValues.add("concurrency", concurrency); + properties.add("concurrency", concurrency); } String prefetch = containerEle.getAttribute(PREFETCH_ATTRIBUTE); if (StringUtils.hasText(prefetch)) { if (!isSimpleContainer) { - propertyValues.add("maxMessagesPerTask", prefetch); + properties.add("maxMessagesPerTask", prefetch); } } + String phase = containerEle.getAttribute(PHASE_ATTRIBUTE); + if (StringUtils.hasText(phase)) { + properties.add("phase", phase); + } + String receiveTimeout = containerEle.getAttribute(RECEIVE_TIMEOUT_ATTRIBUTE); if (StringUtils.hasText(receiveTimeout)) { if (!isSimpleContainer) { - propertyValues.add("receiveTimeout", receiveTimeout); + properties.add("receiveTimeout", receiveTimeout); } } String backOffBeanName = containerEle.getAttribute(BACK_OFF_ATTRIBUTE); if (StringUtils.hasText(backOffBeanName)) { if (!isSimpleContainer) { - propertyValues.add("backOff", new RuntimeBeanReference(backOffBeanName)); + properties.add("backOff", new RuntimeBeanReference(backOffBeanName)); } } else { // No need to consider this if back-off is set String recoveryInterval = containerEle.getAttribute(RECOVERY_INTERVAL_ATTRIBUTE); if (StringUtils.hasText(recoveryInterval)) { if (!isSimpleContainer) { - propertyValues.add("recoveryInterval", recoveryInterval); + properties.add("recoveryInterval", recoveryInterval); } } } - return propertyValues; - } - - private boolean isSimpleContainer(Element containerEle) { - String containerType = containerEle.getAttribute(CONTAINER_TYPE_ATTRIBUTE); - return containerType.startsWith("simple"); + return properties; } } diff --git a/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpoint.java b/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpoint.java index afbd67056f3..12baa9af819 100644 --- a/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpoint.java +++ b/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpoint.java @@ -41,8 +41,8 @@ public interface JmsListenerEndpoint { * setting the {@code destination} and the {@code messageListener} to * use but an implementation may override any default setting that * was already set. - * @param container the container to configure + * @param listenerContainer the listener container to configure */ - void setupMessageContainer(MessageListenerContainer container); + void setupListenerContainer(MessageListenerContainer listenerContainer); } diff --git a/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistrar.java b/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistrar.java index 73e0f964cd3..75f24e02dd2 100644 --- a/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistrar.java +++ b/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistrar.java @@ -44,8 +44,8 @@ public class JmsListenerEndpointRegistrar implements ApplicationContextAware, In private ApplicationContext applicationContext; - private final List endpointDescriptors - = new ArrayList(); + private final List endpointDescriptors = + new ArrayList(); /** * Set the {@link JmsListenerEndpointRegistry} instance to use. @@ -73,11 +73,10 @@ public class JmsListenerEndpointRegistrar implements ApplicationContextAware, In } /** - * Set the {@link JmsListenerContainerFactory} to use in case a - * {@link JmsListenerEndpoint} is registered with a {@code null} container - * factory. - *

Alternatively, the bean name of the {@link JmsListenerContainerFactory} - * to use can be specified for a lazy lookup, see {@see #setContainerFactoryBeanName} + * Set the {@link JmsListenerContainerFactory} to use in case a {@link JmsListenerEndpoint} + * is registered with a {@code null} container factory. + *

Alternatively, the bean name of the {@link JmsListenerContainerFactory} to use + * can be specified for a lazy lookup, see {@link #setContainerFactoryBeanName}. */ public void setContainerFactory(JmsListenerContainerFactory containerFactory) { this.containerFactory = containerFactory; @@ -108,10 +107,10 @@ public class JmsListenerEndpointRegistrar implements ApplicationContextAware, In } /** - * Register a new {@link JmsListenerEndpoint} alongside the {@link JmsListenerContainerFactory} - * to use to create the underlying container. - *

The {@code factory} may be {@code null} if the default factory has to be used for that - * endpoint. + * Register a new {@link JmsListenerEndpoint} alongside the + * {@link JmsListenerContainerFactory} to use to create the underlying container. + *

The {@code factory} may be {@code null} if the default factory has to be + * used for that endpoint. */ public void registerEndpoint(JmsListenerEndpoint endpoint, JmsListenerContainerFactory factory) { Assert.notNull(endpoint, "Endpoint must be set"); @@ -121,9 +120,8 @@ public class JmsListenerEndpointRegistrar implements ApplicationContextAware, In } /** - * Register a new {@link JmsListenerEndpoint} using the default {@link JmsListenerContainerFactory} - * to create the underlying container. - * + * Register a new {@link JmsListenerEndpoint} using the default + * {@link JmsListenerContainerFactory} to create the underlying container. * @see #setContainerFactory(JmsListenerContainerFactory) * @see #registerEndpoint(JmsListenerEndpoint, JmsListenerContainerFactory) */ @@ -139,7 +137,7 @@ public class JmsListenerEndpointRegistrar implements ApplicationContextAware, In protected void startAllEndpoints() throws Exception { for (JmsListenerEndpointDescriptor descriptor : endpointDescriptors) { - endpointRegistry.createJmsListenerContainer( + endpointRegistry.registerListenerContainer( descriptor.endpoint, resolveContainerFactory(descriptor)); } } @@ -152,25 +150,25 @@ public class JmsListenerEndpointRegistrar implements ApplicationContextAware, In return this.containerFactory; } else if (this.containerFactoryBeanName != null) { - this.containerFactory = applicationContext.getBean( + this.containerFactory = this.applicationContext.getBean( this.containerFactoryBeanName, JmsListenerContainerFactory.class); - return this.containerFactory; // Consider changing this if live change of the factory is required + return this.containerFactory; // Consider changing this if live change of the factory is required } else { - throw new IllegalStateException("Could not resolve the " - + JmsListenerContainerFactory.class.getSimpleName() + " to use for [" - + descriptor.endpoint + "] no factory was given and no default is set."); + throw new IllegalStateException("Could not resolve the " + + JmsListenerContainerFactory.class.getSimpleName() + " to use for [" + + descriptor.endpoint + "] no factory was given and no default is set."); } } private static class JmsListenerEndpointDescriptor { - private final JmsListenerEndpoint endpoint; - private final JmsListenerContainerFactory containerFactory; + public final JmsListenerEndpoint endpoint; - private JmsListenerEndpointDescriptor(JmsListenerEndpoint endpoint, - JmsListenerContainerFactory containerFactory) { + public final JmsListenerContainerFactory containerFactory; + + public JmsListenerEndpointDescriptor(JmsListenerEndpoint endpoint, JmsListenerContainerFactory containerFactory) { this.endpoint = endpoint; this.containerFactory = containerFactory; } diff --git a/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistry.java b/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistry.java index 20e6026197d..994c9f18cb4 100644 --- a/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistry.java +++ b/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistry.java @@ -18,39 +18,49 @@ package org.springframework.jms.config; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.BeanInitializationException; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.SmartLifecycle; import org.springframework.jms.listener.MessageListenerContainer; import org.springframework.util.Assert; /** - * Create the necessary {@link MessageListenerContainer} instances - * for the registered {@linkplain JmsListenerEndpoint endpoints}. Also - * manage the lifecycle of the containers, in particular with the - * lifecycle of the application context. + * Creates the necessary {@link MessageListenerContainer} instances for the + * registered {@linkplain JmsListenerEndpoint endpoints}. Also manages the + * lifecycle of the listener containers, in particular within the lifecycle + * of the application context. * - *

Contrary to {@link MessageListenerContainer} created manually, - * containers managed by this instances are not registered in the - * application context and are not candidates for autowiring. Use - * {@link #getContainers()} if you need to access the containers - * of this instance for management purposes. If you need to access - * a particular container, use {@link #getContainer(String)} with the - * id of the endpoint. + *

Contrary to {@link MessageListenerContainer}s created manually, listener + * containers managed by registry are not beans in the application context and + * are not candidates for autowiring. Use {@link #getListenerContainers()} if + * you need to access this registry's listener containers for management purposes. + * If you need to access to a specific message listener container, use + * {@link #getListenerContainer(String)} with the id of the endpoint. * * @author Stephane Nicoll + * @author Juergen Hoeller * @since 4.1 * @see JmsListenerEndpoint * @see MessageListenerContainer * @see JmsListenerContainerFactory */ -public class JmsListenerEndpointRegistry implements DisposableBean { +public class JmsListenerEndpointRegistry implements DisposableBean, SmartLifecycle { + + protected final Log logger = LogFactory.getLog(getClass()); + + private final Map listenerContainers = + new LinkedHashMap(); + + private int phase = Integer.MAX_VALUE; - private final Map containers = - new HashMap(); /** * Return the {@link MessageListenerContainer} with the specified id or @@ -59,72 +69,149 @@ public class JmsListenerEndpointRegistry implements DisposableBean { * @return the container or {@code null} if no container with that id exists * @see JmsListenerEndpoint#getId() */ - public MessageListenerContainer getContainer(String id) { - Assert.notNull(id, "the container identifier must be set."); - return containers.get(id); + public MessageListenerContainer getListenerContainer(String id) { + Assert.notNull(id, "Container identifier must not be null"); + return this.listenerContainers.get(id); } /** * Return the managed {@link MessageListenerContainer} instance(s). */ - public Collection getContainers() { - return Collections.unmodifiableCollection(containers.values()); + public Collection getListenerContainers() { + return Collections.unmodifiableCollection(this.listenerContainers.values()); } + /** * Create a message listener container for the given {@link JmsListenerEndpoint}. *

This create the necessary infrastructure to honor that endpoint * with regards to its configuration. * @param endpoint the endpoint to add - * @see #getContainers() - * @see #getContainer(String) + * @see #getListenerContainers() + * @see #getListenerContainer(String) */ - public void createJmsListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory factory) { + public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory factory) { Assert.notNull(endpoint, "Endpoint must not be null"); Assert.notNull(factory, "Factory must not be null"); String id = endpoint.getId(); Assert.notNull(id, "Endpoint id must not be null"); - Assert.state(!containers.containsKey(id), "another endpoint is already " + - "registered with id '" + id + "'"); + Assert.state(!this.listenerContainers.containsKey(id), + "Another endpoint is already registered with id '" + id + "'"); - MessageListenerContainer container = doCreateJmsListenerContainer(endpoint, factory); - containers.put(id, container); + MessageListenerContainer container = createListenerContainer(endpoint, factory); + this.listenerContainers.put(id, container); } /** * Create and start a new container using the specified factory. */ - protected MessageListenerContainer doCreateJmsListenerContainer(JmsListenerEndpoint endpoint, + protected MessageListenerContainer createListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory factory) { - MessageListenerContainer container = factory.createMessageListenerContainer(endpoint); - initializeContainer(container); - return container; + + MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint); + + if (listenerContainer instanceof InitializingBean) { + try { + ((InitializingBean) listenerContainer).afterPropertiesSet(); + } + catch (Exception ex) { + throw new BeanInitializationException("Failed to initialize message listener container", ex); + } + } + + int containerPhase = listenerContainer.getPhase(); + if (containerPhase < Integer.MAX_VALUE) { // a custom phase value + if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) { + throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " + + this.phase + " vs " + containerPhase); + } + this.phase = listenerContainer.getPhase(); + } + + return listenerContainer; + } + + + @Override + public void destroy() { + for (MessageListenerContainer listenerContainer : getListenerContainers()) { + if (listenerContainer instanceof DisposableBean) { + try { + ((DisposableBean) listenerContainer).destroy(); + } + catch (Throwable ex) { + logger.warn("Failed to destroy message listener container", ex); + } + } + } + } + + + // Delegating implementation of SmartLifecycle + + @Override + public int getPhase() { + return this.phase; } @Override - public void destroy() throws Exception { - for (MessageListenerContainer container : getContainers()) { - stopContainer(container); - } + public boolean isAutoStartup() { + return true; } - protected void initializeContainer(MessageListenerContainer container) { - container.start(); - if (container instanceof InitializingBean) { - try { - ((InitializingBean) container).afterPropertiesSet(); - } - catch (Exception e) { - throw new BeanInitializationException("Could not start message listener container", e); + @Override + public void start() { + for (MessageListenerContainer listenerContainer : getListenerContainers()) { + if (listenerContainer.isAutoStartup()) { + listenerContainer.start(); } } } - protected void stopContainer(MessageListenerContainer container) throws Exception { - container.stop(); - if (container instanceof DisposableBean) { - ((DisposableBean) container).destroy(); + @Override + public void stop() { + for (MessageListenerContainer listenerContainer : getListenerContainers()) { + listenerContainer.stop(); + } + } + + @Override + public void stop(Runnable callback) { + Collection listenerContainers = getListenerContainers(); + AggregatingCallback aggregatingCallback = new AggregatingCallback(listenerContainers.size(), callback); + for (MessageListenerContainer listenerContainer : listenerContainers) { + listenerContainer.stop(aggregatingCallback); + } + } + + @Override + public boolean isRunning() { + for (MessageListenerContainer listenerContainer : getListenerContainers()) { + if (listenerContainer.isRunning()) { + return true; + } + } + return false; + } + + + private static class AggregatingCallback implements Runnable { + + private final AtomicInteger count; + + private final Runnable finishCallback; + + public AggregatingCallback(int count, Runnable finishCallback) { + this.count = new AtomicInteger(count); + this.finishCallback = finishCallback; + } + + @Override + public void run() { + if (this.count.decrementAndGet() == 0) { + this.finishCallback.run(); + } } } diff --git a/spring-jms/src/main/java/org/springframework/jms/config/MethodJmsListenerEndpoint.java b/spring-jms/src/main/java/org/springframework/jms/config/MethodJmsListenerEndpoint.java index f245a2f5b65..25f79aea8a3 100644 --- a/spring-jms/src/main/java/org/springframework/jms/config/MethodJmsListenerEndpoint.java +++ b/spring-jms/src/main/java/org/springframework/jms/config/MethodJmsListenerEndpoint.java @@ -43,6 +43,7 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint { private JmsHandlerMethodFactory jmsHandlerMethodFactory; + /** * Set the object instance that should manage this endpoint. */ @@ -51,19 +52,18 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint { } public Object getBean() { - return bean; + return this.bean; } /** - * Set the method to invoke to process a message managed by this - * endpoint. + * Set the method to invoke to process a message managed by this endpoint. */ public void setMethod(Method method) { this.method = method; } public Method getMethod() { - return method; + return this.method; } /** @@ -75,13 +75,14 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint { this.jmsHandlerMethodFactory = jmsHandlerMethodFactory; } + @Override protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) { - Assert.state(jmsHandlerMethodFactory != null, - "Could not create message listener, message listener factory not set."); + Assert.state(this.jmsHandlerMethodFactory != null, + "Could not create message listener - message listener factory not set"); MessagingMessageListenerAdapter messageListener = createMessageListenerInstance(); InvocableHandlerMethod invocableHandlerMethod = - jmsHandlerMethodFactory.createInvocableHandlerMethod(getBean(), getMethod()); + this.jmsHandlerMethodFactory.createInvocableHandlerMethod(getBean(), getMethod()); messageListener.setHandlerMethod(invocableHandlerMethod); String responseDestination = getDefaultResponseDestination(); if (StringUtils.hasText(responseDestination)) { @@ -122,12 +123,8 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint { @Override protected StringBuilder getEndpointDescription() { return super.getEndpointDescription() - .append(" | bean='") - .append(this.bean) - .append("'") - .append(" | method='") - .append(this.method) - .append("'"); + .append(" | bean='").append(this.bean).append("'") + .append(" | method='").append(this.method).append("'"); } } diff --git a/spring-jms/src/main/java/org/springframework/jms/config/SimpleJmsListenerContainerFactory.java b/spring-jms/src/main/java/org/springframework/jms/config/SimpleJmsListenerContainerFactory.java index 32bd777006e..ccd7e50e4f9 100644 --- a/spring-jms/src/main/java/org/springframework/jms/config/SimpleJmsListenerContainerFactory.java +++ b/spring-jms/src/main/java/org/springframework/jms/config/SimpleJmsListenerContainerFactory.java @@ -19,8 +19,8 @@ package org.springframework.jms.config; import org.springframework.jms.listener.SimpleMessageListenerContainer; /** - * A {@link JmsListenerContainerFactory} implementation to build - * a {@link SimpleMessageListenerContainer}. + * A {@link JmsListenerContainerFactory} implementation to build a + * standard {@link SimpleMessageListenerContainer}. * * @author Stephane Nicoll * @since 4.1 diff --git a/spring-jms/src/main/java/org/springframework/jms/config/SimpleJmsListenerEndpoint.java b/spring-jms/src/main/java/org/springframework/jms/config/SimpleJmsListenerEndpoint.java index 2bc184065ea..9e98c26f40c 100644 --- a/spring-jms/src/main/java/org/springframework/jms/config/SimpleJmsListenerEndpoint.java +++ b/spring-jms/src/main/java/org/springframework/jms/config/SimpleJmsListenerEndpoint.java @@ -31,17 +31,23 @@ public class SimpleJmsListenerEndpoint extends AbstractJmsListenerEndpoint { private MessageListener messageListener; + + /** + * Set the {@link MessageListener} to invoke when a message matching + * the endpoint is received. + */ + public void setMessageListener(MessageListener messageListener) { + this.messageListener = messageListener; + } + /** * Return the {@link MessageListener} to invoke when a message matching * the endpoint is received. */ public MessageListener getMessageListener() { - return messageListener; + return this.messageListener; } - public void setMessageListener(MessageListener messageListener) { - this.messageListener = messageListener; - } @Override protected MessageListener createMessageListener(MessageListenerContainer container) { @@ -51,9 +57,7 @@ public class SimpleJmsListenerEndpoint extends AbstractJmsListenerEndpoint { @Override protected StringBuilder getEndpointDescription() { return super.getEndpointDescription() - .append(" | messageListener='") - .append(this.messageListener) - .append("'"); + .append(" | messageListener='").append(this.messageListener).append("'"); } } diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java index e74dbe69126..c20da82856b 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java @@ -16,11 +16,14 @@ package org.springframework.jms.listener; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; @@ -29,7 +32,9 @@ import javax.jms.Topic; import org.springframework.jms.support.JmsUtils; import org.springframework.jms.support.converter.MessageConverter; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import org.springframework.util.ErrorHandler; +import org.springframework.util.ReflectionUtils; /** * Abstract base class for message listener containers. Can either host @@ -126,6 +131,13 @@ import org.springframework.util.ErrorHandler; public abstract class AbstractMessageListenerContainer extends AbstractJmsListeningContainer implements MessageListenerContainer { + private static final Method createSharedConsumerMethod = ClassUtils.getMethodIfAvailable( + Session.class, "createSharedConsumer", Topic.class, String.class, String.class); + + private static final Method createSharedDurableConsumerMethod = ClassUtils.getMethodIfAvailable( + Session.class, "createSharedDurableConsumer", Topic.class, String.class, String.class); + + private volatile Object destination; private volatile String messageSelector; @@ -134,14 +146,18 @@ public abstract class AbstractMessageListenerContainer private boolean subscriptionDurable = false; - private String durableSubscriptionName; + private boolean subscriptionShared = false; + + private String subscriptionName; + + private boolean pubSubNoLocal = false; + + private MessageConverter messageConverter; private ExceptionListener exceptionListener; private ErrorHandler errorHandler; - private MessageConverter messageConverter; - private boolean exposeListenerSession = true; private boolean acceptMessagesWhileStopping = false; @@ -252,8 +268,8 @@ public abstract class AbstractMessageListenerContainer public void setMessageListener(Object messageListener) { checkMessageListener(messageListener); this.messageListener = messageListener; - if (this.durableSubscriptionName == null) { - this.durableSubscriptionName = getDefaultSubscriptionName(messageListener); + if (this.subscriptionName == null) { + this.subscriptionName = getDefaultSubscriptionName(messageListener); } } @@ -301,15 +317,20 @@ public abstract class AbstractMessageListenerContainer /** * Set whether to make the subscription durable. The durable subscription name - * to be used can be specified through the "durableSubscriptionName" property. + * to be used can be specified through the "subscriptionName" property. *

Default is "false". Set this to "true" to register a durable subscription, - * typically in combination with a "durableSubscriptionName" value (unless + * typically in combination with a "subscriptionName" value (unless * your message listener class name is good enough as subscription name). - *

Only makes sense when listening to a topic (pub-sub domain). - * @see #setDurableSubscriptionName + *

Only makes sense when listening to a topic (pub-sub domain), + * therefore this method switches the "pubSubDomain" flag as well. + * @see #setSubscriptionName + * @see #setPubSubDomain */ public void setSubscriptionDurable(boolean subscriptionDurable) { this.subscriptionDurable = subscriptionDurable; + if (subscriptionDurable) { + setPubSubDomain(true); + } } /** @@ -320,25 +341,108 @@ public abstract class AbstractMessageListenerContainer } /** - * Set the name of a durable subscription to create. To be applied in case - * of a topic (pub-sub domain) with subscription durability activated. + * Set whether to make the subscription shared. The shared subscription name + * to be used can be specified through the "subscriptionName" property. + *

Default is "false". Set this to "true" to register a shared subscription, + * typically in combination with a "subscriptionName" value (unless + * your message listener class name is good enough as subscription name). + * Note that shared subscriptions may also be durable, so this flag can + * (and often will) be combined with "subscriptionDurable" as well. + *

Only makes sense when listening to a topic (pub-sub domain), + * therefore this method switches the "pubSubDomain" flag as well. + *

Requires a JMS 2.0 compatible message broker. + * @see #setSubscriptionName + * @see #setSubscriptionDurable + * @see #setPubSubDomain + */ + public void setSubscriptionShared(boolean subscriptionShared) { + this.subscriptionShared = subscriptionShared; + if (subscriptionShared) { + setPubSubDomain(true); + } + } + + /** + * Return whether to make the subscription shared. + */ + public boolean isSubscriptionShared() { + return this.subscriptionShared; + } + + /** + * Set the name of a subscription to create. To be applied in case + * of a topic (pub-sub domain) with a shared or durable subscription. + *

The subscription name needs to be unique within this client's + * JMS client id. Default is the class name of the specified message listener. + *

Note: Only 1 concurrent consumer (which is the default of this + * message listener container) is allowed for each subscription, + * except for a shared subscription (which requires JMS 2.0). + * @see #setPubSubDomain + * @see #setSubscriptionDurable + * @see #setSubscriptionShared + * @see #setClientId + * @see #setMessageListener + */ + public void setSubscriptionName(String subscriptionName) { + this.subscriptionName = subscriptionName; + } + + public String getSubscriptionName() { + return this.subscriptionName; + } + + /** + * Set the name of a durable subscription to create. This method switches + * to pub-sub domain mode and activates subscription durability as well. *

The durable subscription name needs to be unique within this client's * JMS client id. Default is the class name of the specified message listener. *

Note: Only 1 concurrent consumer (which is the default of this - * message listener container) is allowed for each durable subscription. + * message listener container) is allowed for each durable subscription, + * except for a shared durable subscription (which requires JMS 2.0). + * @see #setPubSubDomain * @see #setSubscriptionDurable + * @see #setSubscriptionShared * @see #setClientId * @see #setMessageListener */ public void setDurableSubscriptionName(String durableSubscriptionName) { - this.durableSubscriptionName = durableSubscriptionName; + this.subscriptionName = durableSubscriptionName; + this.subscriptionDurable = true; } /** * Return the name of a durable subscription to create, if any. */ public String getDurableSubscriptionName() { - return this.durableSubscriptionName; + return (this.subscriptionDurable ? this.subscriptionName : null); + } + + /** + * Set whether to inhibit the delivery of messages published by its own connection. + * Default is "false". + * @see javax.jms.Session#createConsumer(javax.jms.Destination, String, boolean) + */ + public void setPubSubNoLocal(boolean pubSubNoLocal) { + this.pubSubNoLocal = pubSubNoLocal; + } + + /** + * Return whether to inhibit the delivery of messages published by its own connection. + */ + public boolean isPubSubNoLocal() { + return this.pubSubNoLocal; + } + + /** + * Set the {@link MessageConverter} strategy for converting JMS Messages. + */ + public void setMessageConverter(MessageConverter messageConverter) { + this.messageConverter = messageConverter; + } + + @Override + public MessageConverter getMessageConverter() { + return this.messageConverter; } /** @@ -358,25 +462,21 @@ public abstract class AbstractMessageListenerContainer } /** - * Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown - * while processing a Message. By default there will be no ErrorHandler - * so that error-level logging is the only result. + * Set the ErrorHandler to be invoked in case of any uncaught exceptions thrown + * while processing a Message. + *

By default, there will be no ErrorHandler so that error-level + * logging is the only result. */ public void setErrorHandler(ErrorHandler errorHandler) { this.errorHandler = errorHandler; } /** - * Set the {@link MessageConverter} strategy for converting JMS Messages. - * @param messageConverter the message converter to use + * Return the ErrorHandler to be invoked in case of any uncaught exceptions thrown + * while processing a Message. */ - public void setMessageConverter(MessageConverter messageConverter) { - this.messageConverter = messageConverter; - } - - @Override - public MessageConverter getMessageConverter() { - return messageConverter; + public ErrorHandler getErrorHandler() { + return this.errorHandler; } /** @@ -436,9 +536,6 @@ public abstract class AbstractMessageListenerContainer if (this.destination == null) { throw new IllegalArgumentException("Property 'destination' or 'destinationName' is required"); } - if (isSubscriptionDurable() && !isPubSubDomain()) { - throw new IllegalArgumentException("A durable subscription requires a topic (pub-sub domain)"); - } } @Override @@ -446,6 +543,7 @@ public abstract class AbstractMessageListenerContainer setMessageListener(messageListener); } + //------------------------------------------------------------------------- // Template methods for listener execution //------------------------------------------------------------------------- @@ -668,6 +766,51 @@ public abstract class AbstractMessageListenerContainer return isSessionTransacted(); } + /** + * Create a JMS MessageConsumer for the given Session and Destination. + *

This implementation uses JMS 1.1 API. + * @param session the JMS Session to create a MessageConsumer for + * @param destination the JMS Destination to create a MessageConsumer for + * @return the new JMS MessageConsumer + * @throws javax.jms.JMSException if thrown by JMS API methods + */ + protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException { + if (isPubSubDomain() && destination instanceof Topic) { + if (isSubscriptionShared()) { + // createSharedConsumer((Topic) dest, subscription, selector); + // createSharedDurableConsumer((Topic) dest, subscription, selector); + Method method = (isSubscriptionDurable() ? + createSharedDurableConsumerMethod : createSharedConsumerMethod); + try { + return (MessageConsumer) method.invoke(session, destination, getSubscriptionName(), getMessageSelector()); + } + catch (InvocationTargetException ex) { + if (ex.getTargetException() instanceof JMSException) { + throw (JMSException) ex.getTargetException(); + } + ReflectionUtils.handleInvocationTargetException(ex); + return null; + } + catch (IllegalAccessException ex) { + throw new IllegalStateException("Could not access JMS 2.0 API method: " + ex.getMessage()); + } + } + else if (isSubscriptionDurable()) { + return session.createDurableSubscriber( + (Topic) destination, getSubscriptionName(), getMessageSelector(), isPubSubNoLocal()); + } + else { + // Only pass in the NoLocal flag in case of a Topic (pub-sub mode): + // Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException + // in case of the NoLocal flag being specified for a Queue. + return session.createConsumer(destination, getMessageSelector(), isPubSubNoLocal()); + } + } + else { + return session.createConsumer(destination, getMessageSelector()); + } + } + /** * Handle the given exception that arose during listener execution. *

The default implementation logs the exception at warn level, @@ -714,8 +857,9 @@ public abstract class AbstractMessageListenerContainer * @see #setErrorHandler */ protected void invokeErrorHandler(Throwable ex) { - if (this.errorHandler != null) { - this.errorHandler.handleError(ex); + ErrorHandler errorHandler = getErrorHandler(); + if (errorHandler != null) { + errorHandler.handleError(ex); } else if (logger.isWarnEnabled()) { logger.warn("Execution of JMS message listener failed, and no ErrorHandler has been set.", ex); diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractPollingMessageListenerContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractPollingMessageListenerContainer.java index f2282310c40..a70817ffdb6 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractPollingMessageListenerContainer.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractPollingMessageListenerContainer.java @@ -87,8 +87,6 @@ public abstract class AbstractPollingMessageListenerContainer extends AbstractMe private boolean sessionTransactedCalled = false; - private boolean pubSubNoLocal = false; - private PlatformTransactionManager transactionManager; private DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition(); @@ -104,22 +102,6 @@ public abstract class AbstractPollingMessageListenerContainer extends AbstractMe this.sessionTransactedCalled = true; } - /** - * Set whether to inhibit the delivery of messages published by its own connection. - * Default is "false". - * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, String, boolean) - */ - public void setPubSubNoLocal(boolean pubSubNoLocal) { - this.pubSubNoLocal = pubSubNoLocal; - } - - /** - * Return whether to inhibit the delivery of messages published by its own connection. - */ - protected boolean isPubSubNoLocal() { - return this.pubSubNoLocal; - } - /** * Specify the Spring {@link org.springframework.transaction.PlatformTransactionManager} * to use for transactional wrapping of message reception plus listener execution. @@ -452,11 +434,6 @@ public abstract class AbstractPollingMessageListenerContainer extends AbstractMe protected void noMessageReceived(Object invoker, Session session) { } - - //------------------------------------------------------------------------- - // JMS 1.1 factory methods, potentially overridden for JMS 1.0.2 - //------------------------------------------------------------------------- - /** * Fetch an appropriate Connection from the given JmsResourceHolder. *

This implementation accepts any JMS 1.1 Connection. @@ -479,32 +456,6 @@ public abstract class AbstractPollingMessageListenerContainer extends AbstractMe return holder.getSession(); } - /** - * Create a JMS MessageConsumer for the given Session and Destination. - *

This implementation uses JMS 1.1 API. - * @param session the JMS Session to create a MessageConsumer for - * @param destination the JMS Destination to create a MessageConsumer for - * @return the new JMS MessageConsumer - * @throws javax.jms.JMSException if thrown by JMS API methods - */ - protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException { - // Only pass in the NoLocal flag in case of a Topic: - // Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException - // in case of the NoLocal flag being specified for a Queue. - if (isPubSubDomain()) { - if (isSubscriptionDurable() && destination instanceof Topic) { - return session.createDurableSubscriber( - (Topic) destination, getDurableSubscriptionName(), getMessageSelector(), isPubSubNoLocal()); - } - else { - return session.createConsumer(destination, getMessageSelector(), isPubSubNoLocal()); - } - } - else { - return session.createConsumer(destination, getMessageSelector()); - } - } - /** * ResourceFactory implementation that delegates to this listener container's protected callback methods. diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/MessageListenerContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/MessageListenerContainer.java index d38c7b7b6a8..9d77fbc57d4 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/MessageListenerContainer.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/MessageListenerContainer.java @@ -16,7 +16,7 @@ package org.springframework.jms.listener; -import org.springframework.context.Lifecycle; +import org.springframework.context.SmartLifecycle; import org.springframework.jms.support.converter.MessageConverter; /** @@ -27,7 +27,7 @@ import org.springframework.jms.support.converter.MessageConverter; * @author Stephane Nicoll * @since 4.1 */ -public interface MessageListenerContainer extends Lifecycle { +public interface MessageListenerContainer extends SmartLifecycle { /** * Setup the message listener to use. Throws an {@link IllegalArgumentException} diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/SimpleMessageListenerContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/SimpleMessageListenerContainer.java index 7f013082bae..019c553b9ef 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/SimpleMessageListenerContainer.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/SimpleMessageListenerContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,7 +27,6 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; -import javax.jms.Topic; import org.springframework.jms.support.JmsUtils; import org.springframework.transaction.support.TransactionSynchronizationManager; @@ -60,8 +59,6 @@ import org.springframework.util.Assert; */ public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer implements ExceptionListener { - private boolean pubSubNoLocal = false; - private boolean connectLazily = false; private int concurrentConsumers = 1; @@ -75,22 +72,6 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta private final Object consumersMonitor = new Object(); - /** - * Set whether to inhibit the delivery of messages published by its own connection. - * Default is "false". - * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, String, boolean) - */ - public void setPubSubNoLocal(boolean pubSubNoLocal) { - this.pubSubNoLocal = pubSubNoLocal; - } - - /** - * Return whether to inhibit the delivery of messages published by its own connection. - */ - protected boolean isPubSubNoLocal() { - return this.pubSubNoLocal; - } - /** * Specify whether to connect lazily, i.e. whether to establish the JMS Connection * and the corresponding Sessions and MessageConsumers as late as possible - @@ -370,35 +351,4 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta } } - - //------------------------------------------------------------------------- - // JMS 1.1 factory methods, potentially overridden for JMS 1.0.2 - //------------------------------------------------------------------------- - - /** - * Create a JMS MessageConsumer for the given Session and Destination. - *

This implementation uses JMS 1.1 API. - * @param session the JMS Session to create a MessageConsumer for - * @param destination the JMS Destination to create a MessageConsumer for - * @return the new JMS MessageConsumer - * @throws JMSException if thrown by JMS API methods - */ - protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException { - // Only pass in the NoLocal flag in case of a Topic: - // Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException - // in case of the NoLocal flag being specified for a Queue. - if (isPubSubDomain()) { - if (isSubscriptionDurable() && destination instanceof Topic) { - return session.createDurableSubscriber( - (Topic) destination, getDurableSubscriptionName(), getMessageSelector(), isPubSubNoLocal()); - } - else { - return session.createConsumer(destination, getMessageSelector(), isPubSubNoLocal()); - } - } - else { - return session.createConsumer(destination, getMessageSelector()); - } - } - } diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/adapter/AbstractAdaptableMessageListener.java b/spring-jms/src/main/java/org/springframework/jms/listener/adapter/AbstractAdaptableMessageListener.java index f4e8d6f3734..f910fd43082 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/adapter/AbstractAdaptableMessageListener.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/adapter/AbstractAdaptableMessageListener.java @@ -269,7 +269,7 @@ public abstract class AbstractAdaptableMessageListener MessageConverter converter = getMessageConverter(); if (converter != null) { if (result instanceof org.springframework.messaging.Message) { - return messagingMessageConverter.toMessage(result, session); + return this.messagingMessageConverter.toMessage(result, session); } else { return converter.toMessage(result, session); diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/DefaultJmsActivationSpecFactory.java b/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/DefaultJmsActivationSpecFactory.java index 85c9179298c..b4363002f4e 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/DefaultJmsActivationSpecFactory.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/DefaultJmsActivationSpecFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,7 @@ import org.springframework.beans.BeanWrapper; /** * Default implementation of the {@link JmsActivationSpecFactory} interface. - * Supports the standard JMS properties as defined by the JMS 1.5 specification, + * Supports the standard JMS properties as defined by the JCA 1.5 specification, * as well as Spring's extended "maxConcurrency" and "prefetchSize" settings * through autodetection of well-known vendor-specific provider properties. * @@ -158,7 +158,7 @@ public class DefaultJmsActivationSpecFactory extends StandardJmsActivationSpecFa // JORAM bw.setPropertyValue("maxMessages", Integer.toString(config.getPrefetchSize())); } - else if(bw.isWritableProperty("maxBatchSize")){ + else if (bw.isWritableProperty("maxBatchSize")){ // WebSphere bw.setPropertyValue("maxBatchSize", Integer.toString(config.getPrefetchSize())); } diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/JmsActivationSpecConfig.java b/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/JmsActivationSpecConfig.java index 5a461992883..fc2fdf85510 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/JmsActivationSpecConfig.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/JmsActivationSpecConfig.java @@ -48,7 +48,9 @@ public class JmsActivationSpecConfig { private boolean subscriptionDurable = false; - private String durableSubscriptionName; + private boolean subscriptionShared = false; + + private String subscriptionName; private String clientId; @@ -81,18 +83,41 @@ public class JmsActivationSpecConfig { public void setSubscriptionDurable(boolean subscriptionDurable) { this.subscriptionDurable = subscriptionDurable; + if (subscriptionDurable) { + this.pubSubDomain = true; + } } public boolean isSubscriptionDurable() { return this.subscriptionDurable; } + public void setSubscriptionShared(boolean subscriptionShared) { + this.subscriptionShared = subscriptionShared; + if (subscriptionShared) { + this.pubSubDomain = true; + } + } + + public boolean isSubscriptionShared() { + return this.subscriptionShared; + } + + public void setSubscriptionName(String subscriptionName) { + this.subscriptionName = subscriptionName; + } + + public String getSubscriptionName() { + return this.subscriptionName; + } + public void setDurableSubscriptionName(String durableSubscriptionName) { - this.durableSubscriptionName = durableSubscriptionName; + this.subscriptionName = durableSubscriptionName; + this.subscriptionDurable = true; } public String getDurableSubscriptionName() { - return this.durableSubscriptionName; + return (this.subscriptionDurable ? this.subscriptionName : null); } public void setClientId(String clientId) { @@ -217,7 +242,7 @@ public class JmsActivationSpecConfig { * Return the {@link MessageConverter} to use, if any. */ public MessageConverter getMessageConverter() { - return messageConverter; + return this.messageConverter; } } diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/JmsMessageEndpointManager.java b/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/JmsMessageEndpointManager.java index 24b8f6ba323..a82524b3873 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/JmsMessageEndpointManager.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/JmsMessageEndpointManager.java @@ -144,7 +144,42 @@ public class JmsMessageEndpointManager extends GenericMessageEndpointManager * should use for activating its listener. Return {@code null} if none is set. */ public JmsActivationSpecConfig getActivationSpecConfig() { - return activationSpecConfig; + return this.activationSpecConfig; + } + + /** + * Set the name of this message endpoint. Populated with the bean name + * automatically when defined within Spring's bean factory. + */ + @Override + public void setBeanName(String beanName) { + this.endpointFactory.setBeanName(beanName); + } + + + @Override + public void afterPropertiesSet() throws ResourceException { + if (this.messageListenerSet) { + setMessageEndpointFactory(this.endpointFactory); + } + if (this.activationSpecConfig != null) { + setActivationSpec( + this.activationSpecFactory.createActivationSpec(getResourceAdapter(), this.activationSpecConfig)); + } + super.afterPropertiesSet(); + } + + + @Override + public void setupMessageListener(Object messageListener) { + if (messageListener instanceof MessageListener) { + setMessageListener((MessageListener) messageListener); + } + else { + throw new IllegalArgumentException("Unsupported message listener '" + + messageListener.getClass().getName() + "': only '" + MessageListener.class.getName() + + "' type is supported"); + } } @Override @@ -162,40 +197,7 @@ public class JmsMessageEndpointManager extends GenericMessageEndpointManager if (config != null) { return config.isPubSubDomain(); } - throw new IllegalStateException("could not determine pubSubDomain, no activation spec config is set"); - } - - /** - * Set the name of this message endpoint. Populated with the bean name - * automatically when defined within Spring's bean factory. - */ - @Override - public void setBeanName(String beanName) { - this.endpointFactory.setBeanName(beanName); - } - - @Override - public void setupMessageListener(Object messageListener) { - if (messageListener instanceof MessageListener) { - setMessageListener((MessageListener) messageListener); - } - else { - throw new IllegalArgumentException("Unsupported message listener '" - + messageListener.getClass().getName() + "': only '" - + MessageListener.class.getName() + "' type is supported"); - } - } - - @Override - public void afterPropertiesSet() throws ResourceException { - if (this.messageListenerSet) { - setMessageEndpointFactory(this.endpointFactory); - } - if (this.activationSpecConfig != null) { - setActivationSpec( - this.activationSpecFactory.createActivationSpec(getResourceAdapter(), this.activationSpecConfig)); - } - super.afterPropertiesSet(); + throw new IllegalStateException("Could not determine pubSubDomain - no activation spec config is set"); } } diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/StandardJmsActivationSpecFactory.java b/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/StandardJmsActivationSpecFactory.java index fafb5e335c2..e90f2bdc456 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/StandardJmsActivationSpecFactory.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/StandardJmsActivationSpecFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -153,17 +153,19 @@ public class StandardJmsActivationSpecFactory implements JmsActivationSpecFactor throw new IllegalArgumentException( "Durable subscriptions not supported by underlying provider: " + this.activationSpecClass.getName()); } - if (config.getDurableSubscriptionName() != null) { - bw.setPropertyValue("subscriptionName", config.getDurableSubscriptionName()); + if (config.isSubscriptionShared()) { + throw new IllegalArgumentException("Shared subscriptions not supported for JCA-driven endpoints"); + } + + if (config.getSubscriptionName() != null) { + bw.setPropertyValue("subscriptionName", config.getSubscriptionName()); } if (config.getClientId() != null) { bw.setPropertyValue("clientId", config.getClientId()); } - if (config.getMessageSelector() != null) { bw.setPropertyValue("messageSelector", config.getMessageSelector()); } - applyAcknowledgeMode(bw, config.getAcknowledgeMode()); } diff --git a/spring-jms/src/main/resources/org/springframework/jms/config/spring-jms-4.1.xsd b/spring-jms/src/main/resources/org/springframework/jms/config/spring-jms-4.1.xsd index 8c26ec77eb1..e61f4793050 100644 --- a/spring-jms/src/main/resources/org/springframework/jms/config/spring-jms-4.1.xsd +++ b/spring-jms/src/main/resources/org/springframework/jms/config/spring-jms-4.1.xsd @@ -218,8 +218,8 @@ @@ -227,6 +227,8 @@ + + @@ -234,7 +236,7 @@ diff --git a/spring-jms/src/test/java/org/springframework/jms/annotation/AbstractJmsAnnotationDrivenTests.java b/spring-jms/src/test/java/org/springframework/jms/annotation/AbstractJmsAnnotationDrivenTests.java index d5899210c2e..7c57b3f76e9 100644 --- a/spring-jms/src/test/java/org/springframework/jms/annotation/AbstractJmsAnnotationDrivenTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/annotation/AbstractJmsAnnotationDrivenTests.java @@ -76,8 +76,8 @@ public abstract class AbstractJmsAnnotationDrivenTests { context.getBean("jmsListenerContainerFactory", JmsListenerContainerTestFactory.class); JmsListenerContainerTestFactory simpleFactory = context.getBean("simpleFactory", JmsListenerContainerTestFactory.class); - assertEquals(1, defaultFactory.getContainers().size()); - assertEquals(1, simpleFactory.getContainers().size()); + assertEquals(1, defaultFactory.getListenerContainers().size()); + assertEquals(1, simpleFactory.getListenerContainers().size()); } @Component @@ -100,9 +100,9 @@ public abstract class AbstractJmsAnnotationDrivenTests { public void testFullConfiguration(ApplicationContext context) { JmsListenerContainerTestFactory simpleFactory = context.getBean("simpleFactory", JmsListenerContainerTestFactory.class); - assertEquals(1, simpleFactory.getContainers().size()); + assertEquals(1, simpleFactory.getListenerContainers().size()); MethodJmsListenerEndpoint endpoint = (MethodJmsListenerEndpoint) - simpleFactory.getContainers().get(0).getEndpoint(); + simpleFactory.getListenerContainers().get(0).getEndpoint(); assertEquals("listener1", endpoint.getId()); assertEquals("queueIn", endpoint.getDestination()); assertEquals("mySelector", endpoint.getSelector()); @@ -129,9 +129,9 @@ public abstract class AbstractJmsAnnotationDrivenTests { context.getBean("jmsListenerContainerFactory", JmsListenerContainerTestFactory.class); JmsListenerContainerTestFactory customFactory = context.getBean("customFactory", JmsListenerContainerTestFactory.class); - assertEquals(1, defaultFactory.getContainers().size()); - assertEquals(1, customFactory.getContainers().size()); - JmsListenerEndpoint endpoint = defaultFactory.getContainers().get(0).getEndpoint(); + assertEquals(1, defaultFactory.getListenerContainers().size()); + assertEquals(1, customFactory.getListenerContainers().size()); + JmsListenerEndpoint endpoint = defaultFactory.getListenerContainers().get(0).getEndpoint(); assertEquals("Wrong endpoint type", SimpleJmsListenerEndpoint.class, endpoint.getClass()); assertEquals("Wrong listener set in custom endpoint", context.getBean("simpleMessageListener"), ((SimpleJmsListenerEndpoint) endpoint).getMessageListener()); @@ -139,11 +139,11 @@ public abstract class AbstractJmsAnnotationDrivenTests { JmsListenerEndpointRegistry customRegistry = context.getBean("customRegistry", JmsListenerEndpointRegistry.class); assertEquals("Wrong number of containers in the registry", 2, - customRegistry.getContainers().size()); + customRegistry.getListenerContainers().size()); assertNotNull("Container with custom id on the annotation should be found", - customRegistry.getContainer("listenerId")); + customRegistry.getListenerContainer("listenerId")); assertNotNull("Container created with custom id should be found", - customRegistry.getContainer("myCustomEndpointId")); + customRegistry.getListenerContainer("myCustomEndpointId")); } @Component @@ -162,7 +162,7 @@ public abstract class AbstractJmsAnnotationDrivenTests { public void testExplicitContainerFactoryConfiguration(ApplicationContext context) { JmsListenerContainerTestFactory defaultFactory = context.getBean("simpleFactory", JmsListenerContainerTestFactory.class); - assertEquals(1, defaultFactory.getContainers().size()); + assertEquals(1, defaultFactory.getListenerContainers().size()); } /** @@ -172,7 +172,7 @@ public abstract class AbstractJmsAnnotationDrivenTests { public void testDefaultContainerFactoryConfiguration(ApplicationContext context) { JmsListenerContainerTestFactory defaultFactory = context.getBean("jmsListenerContainerFactory", JmsListenerContainerTestFactory.class); - assertEquals(1, defaultFactory.getContainers().size()); + assertEquals(1, defaultFactory.getListenerContainers().size()); } static class DefaultBean { @@ -191,12 +191,12 @@ public abstract class AbstractJmsAnnotationDrivenTests { public void testJmsHandlerMethodFactoryConfiguration(ApplicationContext context) throws JMSException { JmsListenerContainerTestFactory simpleFactory = context.getBean("defaultFactory", JmsListenerContainerTestFactory.class); - assertEquals(1, simpleFactory.getContainers().size()); + assertEquals(1, simpleFactory.getListenerContainers().size()); MethodJmsListenerEndpoint endpoint = (MethodJmsListenerEndpoint) - simpleFactory.getContainers().get(0).getEndpoint(); + simpleFactory.getListenerContainers().get(0).getEndpoint(); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); - endpoint.setupMessageContainer(container); + endpoint.setupListenerContainer(container); MessagingMessageListenerAdapter listener = (MessagingMessageListenerAdapter) container.getMessageListener(); listener.onMessage(new StubTextMessage("failValidation"), mock(Session.class)); } diff --git a/spring-jms/src/test/java/org/springframework/jms/annotation/JmsListenerAnnotationBeanPostProcessorTests.java b/spring-jms/src/test/java/org/springframework/jms/annotation/JmsListenerAnnotationBeanPostProcessorTests.java index 40f7b3f0afa..ec9a3565aa1 100644 --- a/spring-jms/src/test/java/org/springframework/jms/annotation/JmsListenerAnnotationBeanPostProcessorTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/annotation/JmsListenerAnnotationBeanPostProcessorTests.java @@ -16,8 +16,6 @@ package org.springframework.jms.annotation; -import static org.junit.Assert.*; - import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; @@ -38,47 +36,51 @@ import org.springframework.jms.config.MethodJmsListenerEndpoint; import org.springframework.jms.listener.SimpleMessageListenerContainer; import org.springframework.stereotype.Component; +import static org.junit.Assert.*; + /** - * * @author Stephane Nicoll */ public class JmsListenerAnnotationBeanPostProcessorTests { @Test public void simpleMessageListener() { - final ConfigurableApplicationContext context = new AnnotationConfigApplicationContext(Config.class, - SimpleMessageListenerTestBean.class); + ConfigurableApplicationContext context = new AnnotationConfigApplicationContext( + Config.class, SimpleMessageListenerTestBean.class); JmsListenerContainerTestFactory factory = context.getBean(JmsListenerContainerTestFactory.class); - assertEquals("one container should have been registered", 1, factory.getContainers().size()); - MessageListenerTestContainer container = factory.getContainers().get(0); + assertEquals("One container should have been registered", 1, factory.getListenerContainers().size()); + MessageListenerTestContainer container = factory.getListenerContainers().get(0); JmsListenerEndpoint endpoint = container.getEndpoint(); assertEquals("Wrong endpoint type", MethodJmsListenerEndpoint.class, endpoint.getClass()); MethodJmsListenerEndpoint methodEndpoint = (MethodJmsListenerEndpoint) endpoint; assertNotNull(methodEndpoint.getBean()); assertNotNull(methodEndpoint.getMethod()); - assertTrue("Should have been started " + container, container.isStarted()); SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(); - methodEndpoint.setupMessageContainer(listenerContainer); + methodEndpoint.setupListenerContainer(listenerContainer); assertNotNull(listenerContainer.getMessageListener()); + context.start(); + assertTrue("Should have been started " + container, container.isStarted()); + context.close(); // Close and stop the listeners assertTrue("Should have been stopped " + container, container.isStopped()); } @Test public void metaAnnotationIsDiscovered() { - final ConfigurableApplicationContext context = new AnnotationConfigApplicationContext(Config.class, - MetaAnnotationTestBean.class); + ConfigurableApplicationContext context = new AnnotationConfigApplicationContext( + Config.class, MetaAnnotationTestBean.class); JmsListenerContainerTestFactory factory = context.getBean(JmsListenerContainerTestFactory.class); - assertEquals("one container should have been registered", 1, factory.getContainers().size()); - JmsListenerEndpoint endpoint = factory.getContainers().get(0).getEndpoint(); + assertEquals("one container should have been registered", 1, factory.getListenerContainers().size()); + JmsListenerEndpoint endpoint = factory.getListenerContainers().get(0).getEndpoint(); assertEquals("metaTestQueue", ((AbstractJmsListenerEndpoint) endpoint).getDestination()); } + @Component static class SimpleMessageListenerTestBean { @@ -88,6 +90,7 @@ public class JmsListenerAnnotationBeanPostProcessorTests { } + @Component static class MetaAnnotationTestBean { @@ -101,9 +104,9 @@ public class JmsListenerAnnotationBeanPostProcessorTests { @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) static @interface FooListener { - } + @Configuration static class Config { @@ -124,6 +127,6 @@ public class JmsListenerAnnotationBeanPostProcessorTests { public JmsListenerContainerTestFactory testFactory() { return new JmsListenerContainerTestFactory(); } - } + } diff --git a/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerContainerFactoryIntegrationTests.java b/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerContainerFactoryIntegrationTests.java index cbcc4a904a9..77a6585ec06 100644 --- a/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerContainerFactoryIntegrationTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerContainerFactoryIntegrationTests.java @@ -83,7 +83,7 @@ public class JmsListenerContainerFactoryIntegrationTests { @SuppressWarnings("unchecked") private void invokeListener(JmsListenerEndpoint endpoint, Message message) throws JMSException { DefaultMessageListenerContainer messageListenerContainer = - containerFactory.createMessageListenerContainer(endpoint); + containerFactory.createListenerContainer(endpoint); Object listener = messageListenerContainer.getMessageListener(); if (listener instanceof SessionAwareMessageListener) { ((SessionAwareMessageListener) listener).onMessage(message, mock(Session.class)); diff --git a/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerContainerFactoryTests.java b/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerContainerFactoryTests.java index 2e63fba4e79..fe570a0368a 100644 --- a/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerContainerFactoryTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerContainerFactoryTests.java @@ -45,7 +45,6 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.*; /** - * * @author Stephane Nicoll */ public class JmsListenerContainerFactoryTests { @@ -61,6 +60,7 @@ public class JmsListenerContainerFactoryTests { private final TransactionManager transactionManager = mock(TransactionManager.class); + @Test public void createSimpleContainer() { SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory(); @@ -71,14 +71,13 @@ public class JmsListenerContainerFactoryTests { endpoint.setMessageListener(messageListener); endpoint.setDestination("myQueue"); - SimpleMessageListenerContainer container = factory.createMessageListenerContainer(endpoint); + SimpleMessageListenerContainer container = factory.createListenerContainer(endpoint); assertDefaultJmsConfig(container); assertEquals(messageListener, container.getMessageListener()); assertEquals("myQueue", container.getDestinationName()); } - @Test public void createJmsContainerFullConfig() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); @@ -91,7 +90,7 @@ public class JmsListenerContainerFactoryTests { MessageListener messageListener = new MessageListenerAdapter(); endpoint.setMessageListener(messageListener); endpoint.setDestination("myQueue"); - DefaultMessageListenerContainer container = factory.createMessageListenerContainer(endpoint); + DefaultMessageListenerContainer container = factory.createListenerContainer(endpoint); assertDefaultJmsConfig(container); assertEquals(DefaultMessageListenerContainer.CACHE_CONSUMER, container.getCacheLevel()); @@ -107,13 +106,13 @@ public class JmsListenerContainerFactoryTests { public void createJcaContainerFullConfig() { DefaultJcaListenerContainerFactory factory = new DefaultJcaListenerContainerFactory(); setDefaultJcaConfig(factory); - factory.getActivationSpecConfig().setConcurrency("10"); + factory.setConcurrency("10"); SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint(); MessageListener messageListener = new MessageListenerAdapter(); endpoint.setMessageListener(messageListener); endpoint.setDestination("myQueue"); - JmsMessageEndpointManager container = factory.createMessageListenerContainer(endpoint); + JmsMessageEndpointManager container = factory.createListenerContainer(endpoint); assertDefaultJcaConfig(container); assertEquals(10, container.getActivationSpecConfig().getMaxConcurrency()); @@ -130,7 +129,7 @@ public class JmsListenerContainerFactoryTests { SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint(); endpoint.setMessageListener(new MessageListenerAdapter()); thrown.expect(IllegalStateException.class); - factory.createMessageListenerContainer(endpoint); + factory.createListenerContainer(endpoint); } @Test @@ -144,7 +143,7 @@ public class JmsListenerContainerFactoryTests { MessageListener messageListener = new MessageListenerAdapter(); endpoint.setMessageListener(messageListener); endpoint.setDestination("myQueue"); - DefaultMessageListenerContainer container = factory.createMessageListenerContainer(endpoint); + DefaultMessageListenerContainer container = factory.createListenerContainer(endpoint); assertSame(backOff, new DirectFieldAccessor(container).getPropertyValue("backOff")); } @@ -174,13 +173,11 @@ public class JmsListenerContainerFactoryTests { private void setDefaultJcaConfig(DefaultJcaListenerContainerFactory factory) { factory.setDestinationResolver(destinationResolver); factory.setTransactionManager(transactionManager); - JmsActivationSpecConfig config = new JmsActivationSpecConfig(); - config.setMessageConverter(messageConverter); - config.setAcknowledgeMode(Session.DUPS_OK_ACKNOWLEDGE); - config.setPubSubDomain(true); - config.setSubscriptionDurable(true); - config.setClientId("client-1234"); - factory.setActivationSpecConfig(config); + factory.setMessageConverter(messageConverter); + factory.setAcknowledgeMode(Session.DUPS_OK_ACKNOWLEDGE); + factory.setPubSubDomain(true); + factory.setSubscriptionDurable(true); + factory.setClientId("client-1234"); } private void assertDefaultJcaConfig(JmsMessageEndpointManager container) { diff --git a/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerContainerTestFactory.java b/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerContainerTestFactory.java index 9745a7c6ec7..f67d976fcf9 100644 --- a/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerContainerTestFactory.java +++ b/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerContainerTestFactory.java @@ -25,17 +25,17 @@ import java.util.List; */ public class JmsListenerContainerTestFactory implements JmsListenerContainerFactory { - private final List containers = + private final List listenerContainers = new ArrayList(); - public List getContainers() { - return containers; + public List getListenerContainers() { + return listenerContainers; } @Override - public MessageListenerTestContainer createMessageListenerContainer(JmsListenerEndpoint endpoint) { + public MessageListenerTestContainer createListenerContainer(JmsListenerEndpoint endpoint) { MessageListenerTestContainer container = new MessageListenerTestContainer(endpoint); - this.containers.add(container); + this.listenerContainers.add(container); return container; } diff --git a/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerEndpointRegistrarTests.java b/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerEndpointRegistrarTests.java index f4dd33da297..aae03c2d106 100644 --- a/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerEndpointRegistrarTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerEndpointRegistrarTests.java @@ -65,8 +65,8 @@ public class JmsListenerEndpointRegistrarTests { registrar.setContainerFactory(containerFactory); registrar.registerEndpoint(endpoint, null); registrar.afterPropertiesSet(); - assertNotNull("Container not created", registry.getContainer("some id")); - assertEquals(1, registry.getContainers().size()); + assertNotNull("Container not created", registry.getListenerContainer("some id")); + assertEquals(1, registry.getListenerContainers().size()); } @Test @@ -87,8 +87,8 @@ public class JmsListenerEndpointRegistrarTests { registrar.setContainerFactory(containerFactory); registrar.registerEndpoint(endpoint); registrar.afterPropertiesSet(); - assertNotNull("Container not created", registry.getContainer("myEndpoint")); - assertEquals(1, registry.getContainers().size()); + assertNotNull("Container not created", registry.getListenerContainer("myEndpoint")); + assertEquals(1, registry.getListenerContainers().size()); } } diff --git a/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerEndpointRegistryTests.java b/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerEndpointRegistryTests.java index 0591e54c36a..52d2996087c 100644 --- a/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerEndpointRegistryTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerEndpointRegistryTests.java @@ -36,27 +36,27 @@ public class JmsListenerEndpointRegistryTests { @Test public void createWithNullEndpoint() { thrown.expect(IllegalArgumentException.class); - registry.createJmsListenerContainer(null, containerFactory); + registry.registerListenerContainer(null, containerFactory); } @Test public void createWithNullEndpointId() { thrown.expect(IllegalArgumentException.class); - registry.createJmsListenerContainer(new SimpleJmsListenerEndpoint(), containerFactory); + registry.registerListenerContainer(new SimpleJmsListenerEndpoint(), containerFactory); } @Test public void createWithNullContainerFactory() { thrown.expect(IllegalArgumentException.class); - registry.createJmsListenerContainer(createEndpoint("foo", "myDestination"), null); + registry.registerListenerContainer(createEndpoint("foo", "myDestination"), null); } @Test public void createWithDuplicateEndpointId() { - registry.createJmsListenerContainer(createEndpoint("test", "queue"), containerFactory); + registry.registerListenerContainer(createEndpoint("test", "queue"), containerFactory); thrown.expect(IllegalStateException.class); - registry.createJmsListenerContainer(createEndpoint("test", "queue"), containerFactory); + registry.registerListenerContainer(createEndpoint("test", "queue"), containerFactory); } private SimpleJmsListenerEndpoint createEndpoint(String id, String destinationName) { diff --git a/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerEndpointTests.java b/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerEndpointTests.java index 3e1780093b4..8e9143dcc99 100644 --- a/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerEndpointTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerEndpointTests.java @@ -16,9 +16,6 @@ package org.springframework.jms.config; -import static org.junit.Assert.*; -import static org.mockito.Mockito.mock; - import javax.jms.MessageListener; import org.junit.Rule; @@ -33,8 +30,10 @@ import org.springframework.jms.listener.adapter.MessageListenerAdapter; import org.springframework.jms.listener.endpoint.JmsActivationSpecConfig; import org.springframework.jms.listener.endpoint.JmsMessageEndpointManager; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + /** - * * @author Stephane Nicoll */ public class JmsListenerEndpointTests { @@ -53,10 +52,10 @@ public class JmsListenerEndpointTests { endpoint.setConcurrency("5-10"); endpoint.setMessageListener(messageListener); - endpoint.setupMessageContainer(container); + endpoint.setupListenerContainer(container); assertEquals("myQueue", container.getDestinationName()); assertEquals("foo = 'bar'", container.getMessageSelector()); - assertEquals("mySubscription", container.getDurableSubscriptionName()); + assertEquals("mySubscription", container.getSubscriptionName()); assertEquals(5, container.getConcurrentConsumers()); assertEquals(10, container.getMaxConcurrentConsumers()); assertEquals(messageListener, container.getMessageListener()); @@ -73,11 +72,11 @@ public class JmsListenerEndpointTests { endpoint.setConcurrency("10"); endpoint.setMessageListener(messageListener); - endpoint.setupMessageContainer(container); + endpoint.setupListenerContainer(container); JmsActivationSpecConfig config = container.getActivationSpecConfig(); assertEquals("myQueue", config.getDestinationName()); assertEquals("foo = 'bar'", config.getMessageSelector()); - assertEquals("mySubscription", config.getDurableSubscriptionName()); + assertEquals("mySubscription", config.getSubscriptionName()); assertEquals(10, config.getMaxConcurrency()); assertEquals(messageListener, container.getMessageListener()); } @@ -90,7 +89,7 @@ public class JmsListenerEndpointTests { endpoint.setConcurrency("5-10"); // simple implementation only support max value endpoint.setMessageListener(messageListener); - endpoint.setupMessageContainer(container); + endpoint.setupListenerContainer(container); assertEquals(10, new DirectFieldAccessor(container).getPropertyValue("concurrentConsumers")); } @@ -100,7 +99,7 @@ public class JmsListenerEndpointTests { SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint(); thrown.expect(IllegalStateException.class); - endpoint.setupMessageContainer(container); + endpoint.setupListenerContainer(container); } @Test @@ -110,7 +109,7 @@ public class JmsListenerEndpointTests { endpoint.setMessageListener(new MessageListenerAdapter()); thrown.expect(IllegalArgumentException.class); - endpoint.setupMessageContainer(container); + endpoint.setupListenerContainer(container); } diff --git a/spring-jms/src/test/java/org/springframework/jms/config/JmsNamespaceHandlerTests.java b/spring-jms/src/test/java/org/springframework/jms/config/JmsNamespaceHandlerTests.java index 8d24ee0ab51..cfbf50c45c5 100644 --- a/spring-jms/src/test/java/org/springframework/jms/config/JmsNamespaceHandlerTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/config/JmsNamespaceHandlerTests.java @@ -145,7 +145,7 @@ public class JmsNamespaceHandlerTests { assertNotNull("No factory registered with testJmsFactory id", factory); DefaultMessageListenerContainer container = - factory.createMessageListenerContainer(createDummyEndpoint()); + factory.createListenerContainer(createDummyEndpoint()); assertEquals("explicit connection factory not set", context.getBean(EXPLICIT_CONNECTION_FACTORY), container.getConnectionFactory()); assertEquals("explicit destination resolver not set", @@ -156,9 +156,8 @@ public class JmsNamespaceHandlerTests { assertEquals("wrong concurrency", 3, container.getConcurrentConsumers()); assertEquals("wrong concurrency", 5, container.getMaxConcurrentConsumers()); assertEquals("wrong prefetch", 50, container.getMaxMessagesPerTask()); - assertSame(context.getBean("testBackOff"),new DirectFieldAccessor(container).getPropertyValue("backOff")); - - assertEquals("phase cannot be customized by the factory", Integer.MAX_VALUE, container.getPhase()); + assertEquals("Wrong phase", 99, container.getPhase()); + assertSame(context.getBean("testBackOff"), new DirectFieldAccessor(container).getPropertyValue("backOff")); } @Test @@ -169,14 +168,14 @@ public class JmsNamespaceHandlerTests { assertNotNull("No factory registered with testJcaFactory id", factory); JmsMessageEndpointManager container = - factory.createMessageListenerContainer(createDummyEndpoint()); + factory.createListenerContainer(createDummyEndpoint()); assertEquals("explicit resource adapter not set", context.getBean("testResourceAdapter"),container.getResourceAdapter()); assertEquals("explicit message converter not set", context.getBean("testMessageConverter"), container.getActivationSpecConfig().getMessageConverter()); assertEquals("wrong concurrency", 5, container.getActivationSpecConfig().getMaxConcurrency()); assertEquals("Wrong prefetch", 50, container.getActivationSpecConfig().getPrefetchSize()); - assertEquals("phase cannot be customized by the factory", Integer.MAX_VALUE, container.getPhase()); + assertEquals("Wrong phase", 77, container.getPhase()); } @Test diff --git a/spring-jms/src/test/java/org/springframework/jms/config/MessageListenerTestContainer.java b/spring-jms/src/test/java/org/springframework/jms/config/MessageListenerTestContainer.java index 2b6a0258f69..650d9a7bec1 100644 --- a/spring-jms/src/test/java/org/springframework/jms/config/MessageListenerTestContainer.java +++ b/spring-jms/src/test/java/org/springframework/jms/config/MessageListenerTestContainer.java @@ -23,7 +23,6 @@ import org.springframework.jms.listener.MessageListenerContainer; import org.springframework.jms.support.converter.MessageConverter; /** - * * @author Stephane Nicoll */ public class MessageListenerTestContainer @@ -57,17 +56,15 @@ public class MessageListenerTestContainer @Override public void start() throws JmsException { + if (!initializationInvoked) { + throw new IllegalStateException("afterPropertiesSet should have been invoked before start on " + this); + } if (startInvoked) { throw new IllegalStateException("Start already invoked on " + this); } startInvoked = true; } - @Override - public boolean isRunning() { - return startInvoked && !stopInvoked; - } - @Override public void stop() throws JmsException { if (stopInvoked) { @@ -77,8 +74,28 @@ public class MessageListenerTestContainer } @Override - public void setupMessageListener(Object messageListener) { + public boolean isRunning() { + return startInvoked && !stopInvoked; + } + @Override + public int getPhase() { + return 0; + } + + @Override + public boolean isAutoStartup() { + return true; + } + + @Override + public void stop(Runnable callback) { + stopInvoked = true; + callback.run(); + } + + @Override + public void setupMessageListener(Object messageListener) { } @Override @@ -93,10 +110,6 @@ public class MessageListenerTestContainer @Override public void afterPropertiesSet() { - if (!startInvoked) { - throw new IllegalStateException("Start should have been invoked before " + - "afterPropertiesSet on " + this); - } initializationInvoked = true; } diff --git a/spring-jms/src/test/java/org/springframework/jms/listener/endpoint/DefaultJmsActivationSpecFactoryTests.java b/spring-jms/src/test/java/org/springframework/jms/listener/endpoint/DefaultJmsActivationSpecFactoryTests.java index 1a78ba5f791..40fdaee4b78 100644 --- a/spring-jms/src/test/java/org/springframework/jms/listener/endpoint/DefaultJmsActivationSpecFactoryTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/listener/endpoint/DefaultJmsActivationSpecFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -62,9 +62,7 @@ public class DefaultJmsActivationSpecFactoryTests extends TestCase { Destination destination = new StubQueue(); DestinationResolver destinationResolver = mock(DestinationResolver.class); - - given(destinationResolver.resolveDestinationName(null, "destinationname", - false)).willReturn(destination); + given(destinationResolver.resolveDestinationName(null, "destinationname", false)).willReturn(destination); DefaultJmsActivationSpecFactory activationSpecFactory = new DefaultJmsActivationSpecFactory(); activationSpecFactory.setDestinationResolver(destinationResolver);