First pass completed, with support for the standard JMS 2.0 API in our CachingConnectionFactory and support for a delivery delay setting in JmsTemplate. Note that none of this has been tested against an actual JMS 2.0 provider yet, due to no such provider being available yet.
Known limitations: * Spring's SingleConnectionFactory and CachingConnectionFactory do not support createContext calls for JMSContext creation at this point. Note that the JMSContext model bypasses the point of a Connection/Session pool anyway; this will only really work with a native JMS 2.0 ConnectionFactory and provider-specific pooling such as in an EE environment. * JmsTemplate has no out-of-the-box support for send calls with an async completion listener. Note that a CompletionListener can be specified in a custom ProducerCallback implementation if really necessary. There is no special support for the simplified JMSContext API, and likely never will be: JMSContext can be used from a native ConnectionFactory directly. @Inject JMSContext isn't supported due to rather involved rules for defining and scoping the injected context which are quite at odds with the Spring way of doing these things. We strongly recommend JmsTemplate instead, or @Resource ConnectionFactory with a createContext call within a Java 7 try-with-resources clause (as shown in the specification). After all, JMSContext has primarily been designed with EE's one-session-per-connection model and JTA transactions in mind, not with Spring-style use of a native JMS provider and native JMS transactions. Issue: SPR-8197
This commit is contained in:
parent
41737e827c
commit
a3d7dc09ef
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2012 the original author or authors.
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -16,6 +16,10 @@
|
|||
|
||||
package org.springframework.jms.connection;
|
||||
|
||||
import java.lang.reflect.InvocationHandler;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Proxy;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
|
@ -25,6 +29,9 @@ import javax.jms.QueueSender;
|
|||
import javax.jms.Topic;
|
||||
import javax.jms.TopicPublisher;
|
||||
|
||||
import org.springframework.util.ClassUtils;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* JMS MessageProducer decorator that adapts calls to a shared MessageProducer
|
||||
* instance underneath, managing QoS settings locally within the decorator.
|
||||
|
@ -34,12 +41,42 @@ import javax.jms.TopicPublisher;
|
|||
*/
|
||||
class CachedMessageProducer implements MessageProducer, QueueSender, TopicPublisher {
|
||||
|
||||
private static final Method setDeliveryDelayMethod =
|
||||
ClassUtils.getMethodIfAvailable(MessageProducer.class, "setDeliveryDelay", long.class);
|
||||
|
||||
private static final Method getDeliveryDelayMethod =
|
||||
ClassUtils.getMethodIfAvailable(MessageProducer.class, "getDeliveryDelay");
|
||||
|
||||
private static Class completionListenerClass;
|
||||
|
||||
private static Method sendWithCompletionListenerMethod;
|
||||
|
||||
private static Method sendWithDestinationAndCompletionListenerMethod;
|
||||
|
||||
static {
|
||||
try {
|
||||
completionListenerClass = ClassUtils.forName(
|
||||
"javax.jms.CompletionListener", CachedMessageProducer.class.getClassLoader());
|
||||
sendWithCompletionListenerMethod = MessageProducer.class.getMethod(
|
||||
"send", Message.class, int.class, int.class, long.class, completionListenerClass);
|
||||
sendWithDestinationAndCompletionListenerMethod = MessageProducer.class.getMethod(
|
||||
"send", Destination.class, Message.class, int.class, int.class, long.class, completionListenerClass);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
// No JMS 2.0 API available
|
||||
completionListenerClass = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private final MessageProducer target;
|
||||
|
||||
private Boolean originalDisableMessageID;
|
||||
|
||||
private Boolean originalDisableMessageTimestamp;
|
||||
|
||||
private Long originalDeliveryDelay;
|
||||
|
||||
private int deliveryMode;
|
||||
|
||||
private int priority;
|
||||
|
@ -57,7 +94,7 @@ class CachedMessageProducer implements MessageProducer, QueueSender, TopicPublis
|
|||
|
||||
public void setDisableMessageID(boolean disableMessageID) throws JMSException {
|
||||
if (this.originalDisableMessageID == null) {
|
||||
this.originalDisableMessageID = Boolean.valueOf(this.target.getDisableMessageID());
|
||||
this.originalDisableMessageID = this.target.getDisableMessageID();
|
||||
}
|
||||
this.target.setDisableMessageID(disableMessageID);
|
||||
}
|
||||
|
@ -68,7 +105,7 @@ class CachedMessageProducer implements MessageProducer, QueueSender, TopicPublis
|
|||
|
||||
public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException {
|
||||
if (this.originalDisableMessageTimestamp == null) {
|
||||
this.originalDisableMessageTimestamp = Boolean.valueOf(this.target.getDisableMessageTimestamp());
|
||||
this.originalDisableMessageTimestamp = this.target.getDisableMessageTimestamp();
|
||||
}
|
||||
this.target.setDisableMessageTimestamp(disableMessageTimestamp);
|
||||
}
|
||||
|
@ -77,6 +114,17 @@ class CachedMessageProducer implements MessageProducer, QueueSender, TopicPublis
|
|||
return this.target.getDisableMessageTimestamp();
|
||||
}
|
||||
|
||||
public void setDeliveryDelay(long deliveryDelay) {
|
||||
if (this.originalDeliveryDelay == null) {
|
||||
this.originalDeliveryDelay = (Long) ReflectionUtils.invokeMethod(getDeliveryDelayMethod, this.target);
|
||||
}
|
||||
ReflectionUtils.invokeMethod(setDeliveryDelayMethod, this.target, deliveryDelay);
|
||||
}
|
||||
|
||||
public long getDeliveryDelay() {
|
||||
return (Long) ReflectionUtils.invokeMethod(getDeliveryDelayMethod, this.target);
|
||||
}
|
||||
|
||||
public void setDeliveryMode(int deliveryMode) {
|
||||
this.deliveryMode = deliveryMode;
|
||||
}
|
||||
|
@ -156,18 +204,66 @@ class CachedMessageProducer implements MessageProducer, QueueSender, TopicPublis
|
|||
public void close() throws JMSException {
|
||||
// It's a cached MessageProducer... reset properties only.
|
||||
if (this.originalDisableMessageID != null) {
|
||||
this.target.setDisableMessageID(this.originalDisableMessageID.booleanValue());
|
||||
this.target.setDisableMessageID(this.originalDisableMessageID);
|
||||
this.originalDisableMessageID = null;
|
||||
}
|
||||
if (this.originalDisableMessageTimestamp != null) {
|
||||
this.target.setDisableMessageTimestamp(this.originalDisableMessageTimestamp.booleanValue());
|
||||
this.target.setDisableMessageTimestamp(this.originalDisableMessageTimestamp);
|
||||
this.originalDisableMessageTimestamp = null;
|
||||
}
|
||||
if (this.originalDeliveryDelay != null) {
|
||||
ReflectionUtils.invokeMethod(setDeliveryDelayMethod, this.target, this.originalDeliveryDelay);
|
||||
this.originalDeliveryDelay = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public String toString() {
|
||||
return "Cached JMS MessageProducer: " + this.target;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Build a dynamic proxy that reflectively adapts to JMS 2.0 API methods, if necessary.
|
||||
* Otherwise simply return this CachedMessageProducer instance itself.
|
||||
*/
|
||||
public MessageProducer getProxyIfNecessary() {
|
||||
if (completionListenerClass != null) {
|
||||
return (MessageProducer) Proxy.newProxyInstance(CachedMessageProducer.class.getClassLoader(),
|
||||
new Class[] {MessageProducer.class, QueueSender.class, TopicPublisher.class},
|
||||
new Jms2MessageProducerInvocationHandler());
|
||||
}
|
||||
else {
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Reflective InvocationHandler which adapts to JMS 2.0 API methods that we
|
||||
* cannot statically compile against while preserving JMS 1.1 compatibility
|
||||
* (due to the new {@code javax.jms.CompletionListener} type in the signatures).
|
||||
*/
|
||||
private class Jms2MessageProducerInvocationHandler implements InvocationHandler {
|
||||
|
||||
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
|
||||
try {
|
||||
if (method.getName().equals("send") && args != null &&
|
||||
completionListenerClass.equals(method.getParameterTypes()[args.length - 1])) {
|
||||
if (args.length == 2) {
|
||||
return sendWithCompletionListenerMethod.invoke(
|
||||
target, args[0], deliveryMode, priority, timeToLive, args[1]);
|
||||
}
|
||||
else if (args.length == 3) {
|
||||
return sendWithDestinationAndCompletionListenerMethod.invoke(
|
||||
target, args[0], args[1], deliveryMode, priority, timeToLive, args[2]);
|
||||
}
|
||||
}
|
||||
return method.invoke(target, args);
|
||||
}
|
||||
catch (InvocationTargetException ex) {
|
||||
throw ex.getTargetException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2012 the original author or authors.
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -40,7 +40,9 @@ import javax.jms.Topic;
|
|||
import javax.jms.TopicSession;
|
||||
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ClassUtils;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* {@link SingleConnectionFactory} subclass that adds {@link javax.jms.Session}
|
||||
|
@ -80,6 +82,10 @@ import org.springframework.util.ObjectUtils;
|
|||
*/
|
||||
public class CachingConnectionFactory extends SingleConnectionFactory {
|
||||
|
||||
private static final Method createSharedDurableConsumerMethod = ClassUtils.getMethodIfAvailable(
|
||||
Session.class, "createSharedDurableConsumer", Topic.class, String.class, String.class);
|
||||
|
||||
|
||||
private int sessionCacheSize = 1;
|
||||
|
||||
private boolean cacheProducers = true;
|
||||
|
@ -333,7 +339,7 @@ public class CachingConnectionFactory extends SingleConnectionFactory {
|
|||
null);
|
||||
}
|
||||
}
|
||||
else if (methodName.equals("createDurableSubscriber")) {
|
||||
else if (methodName.equals("createDurableConsumer") || methodName.equals("createDurableSubscriber")) {
|
||||
Destination dest = (Destination) args[0];
|
||||
if (dest != null) {
|
||||
return getCachedConsumer(dest,
|
||||
|
@ -342,6 +348,15 @@ public class CachingConnectionFactory extends SingleConnectionFactory {
|
|||
(String) args[1]);
|
||||
}
|
||||
}
|
||||
else if (methodName.equals("createSharedDurableConsumer")) {
|
||||
Destination dest = (Destination) args[0];
|
||||
if (dest != null) {
|
||||
return getCachedConsumer(dest,
|
||||
(args.length > 2 ? (String) args[2] : null),
|
||||
null,
|
||||
(String) args[1]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
try {
|
||||
|
@ -367,11 +382,11 @@ public class CachingConnectionFactory extends SingleConnectionFactory {
|
|||
}
|
||||
this.cachedProducers.put(cacheKey, producer);
|
||||
}
|
||||
return new CachedMessageProducer(producer);
|
||||
return new CachedMessageProducer(producer).getProxyIfNecessary();
|
||||
}
|
||||
|
||||
private MessageConsumer getCachedConsumer(
|
||||
Destination dest, String selector, boolean noLocal, String subscription) throws JMSException {
|
||||
Destination dest, String selector, Boolean noLocal, String subscription) throws JMSException {
|
||||
|
||||
ConsumerCacheKey cacheKey = new ConsumerCacheKey(dest, selector, noLocal, subscription);
|
||||
MessageConsumer consumer = this.cachedConsumers.get(cacheKey);
|
||||
|
@ -382,9 +397,27 @@ public class CachingConnectionFactory extends SingleConnectionFactory {
|
|||
}
|
||||
else {
|
||||
if (dest instanceof Topic) {
|
||||
consumer = (subscription != null ?
|
||||
this.target.createDurableSubscriber((Topic) dest, subscription, selector, noLocal) :
|
||||
this.target.createConsumer(dest, selector, noLocal));
|
||||
if (noLocal == null) {
|
||||
// createSharedDurableConsumer((Topic) dest, subscription, selector);
|
||||
try {
|
||||
consumer = (MessageConsumer) createSharedDurableConsumerMethod.invoke
|
||||
(this.target, dest, subscription, selector);
|
||||
}
|
||||
catch (InvocationTargetException ex) {
|
||||
if (ex.getTargetException() instanceof JMSException) {
|
||||
throw (JMSException) ex.getTargetException();
|
||||
}
|
||||
ReflectionUtils.handleInvocationTargetException(ex);
|
||||
}
|
||||
catch (IllegalAccessException ex) {
|
||||
throw new IllegalStateException("Could not access JMS 2.0 API method: " + ex.getMessage());
|
||||
}
|
||||
}
|
||||
else {
|
||||
consumer = (subscription != null ?
|
||||
this.target.createDurableSubscriber((Topic) dest, subscription, selector, noLocal) :
|
||||
this.target.createConsumer(dest, selector, noLocal));
|
||||
}
|
||||
}
|
||||
else {
|
||||
consumer = this.target.createConsumer(dest, selector);
|
||||
|
@ -499,11 +532,11 @@ public class CachingConnectionFactory extends SingleConnectionFactory {
|
|||
|
||||
private final String selector;
|
||||
|
||||
private final boolean noLocal;
|
||||
private final Boolean noLocal;
|
||||
|
||||
private final String subscription;
|
||||
|
||||
public ConsumerCacheKey(Destination destination, String selector, boolean noLocal, String subscription) {
|
||||
public ConsumerCacheKey(Destination destination, String selector, Boolean noLocal, String subscription) {
|
||||
super(destination);
|
||||
this.selector = selector;
|
||||
this.noLocal = noLocal;
|
||||
|
@ -517,7 +550,7 @@ public class CachingConnectionFactory extends SingleConnectionFactory {
|
|||
ConsumerCacheKey otherKey = (ConsumerCacheKey) other;
|
||||
return (destinationEquals(otherKey) &&
|
||||
ObjectUtils.nullSafeEquals(this.selector, otherKey.selector) &&
|
||||
this.noLocal == otherKey.noLocal &&
|
||||
ObjectUtils.nullSafeEquals(this.noLocal, otherKey.noLocal) &&
|
||||
ObjectUtils.nullSafeEquals(this.subscription, otherKey.subscription));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2012 the original author or authors.
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -528,9 +528,20 @@ public class SingleConnectionFactory
|
|||
}
|
||||
else if (method.getName().equals("createSession") || method.getName().equals("createQueueSession") ||
|
||||
method.getName().equals("createTopicSession")) {
|
||||
boolean transacted = (Boolean) args[0];
|
||||
Integer ackMode = (Integer) args[1];
|
||||
Integer mode = (transacted ? Session.SESSION_TRANSACTED : ackMode);
|
||||
// Default: JMS 2.0 createSession() method
|
||||
Integer mode = Session.AUTO_ACKNOWLEDGE;
|
||||
if (args != null) {
|
||||
if (args.length == 1) {
|
||||
// JMS 2.0 createSession(int) method
|
||||
mode = (Integer) args[0];
|
||||
}
|
||||
else if (args.length == 2) {
|
||||
// JMS 1.1 createSession(boolean, int) method
|
||||
boolean transacted = (Boolean) args[0];
|
||||
Integer ackMode = (Integer) args[1];
|
||||
mode = (transacted ? Session.SESSION_TRANSACTED : ackMode);
|
||||
}
|
||||
}
|
||||
Session session = getSession(this.target, mode);
|
||||
if (session != null) {
|
||||
if (!method.getReturnType().isInstance(session)) {
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2012 the original author or authors.
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -16,6 +16,7 @@
|
|||
|
||||
package org.springframework.jms.core;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.DeliveryMode;
|
||||
|
@ -37,6 +38,8 @@ import org.springframework.jms.support.converter.SimpleMessageConverter;
|
|||
import org.springframework.jms.support.destination.JmsDestinationAccessor;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ClassUtils;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* Helper class that simplifies synchronous JMS access code.
|
||||
|
@ -96,6 +99,9 @@ public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations
|
|||
public static final long RECEIVE_TIMEOUT_INDEFINITE_WAIT = 0;
|
||||
|
||||
|
||||
private static final Method setDeliveryDelayMethod =
|
||||
ClassUtils.getMethodIfAvailable(MessageProducer.class, "setDeliveryDelay", long.class);
|
||||
|
||||
/** Internal ResourceFactory adapter for interacting with ConnectionFactoryUtils */
|
||||
private final JmsTemplateResourceFactory transactionalResourceFactory = new JmsTemplateResourceFactory();
|
||||
|
||||
|
@ -113,6 +119,8 @@ public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations
|
|||
|
||||
private long receiveTimeout = RECEIVE_TIMEOUT_INDEFINITE_WAIT;
|
||||
|
||||
private long deliveryDelay = 0;
|
||||
|
||||
|
||||
private boolean explicitQosEnabled = false;
|
||||
|
||||
|
@ -321,6 +329,22 @@ public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations
|
|||
return this.receiveTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the delivery delay to use for send calls (in milliseconds).
|
||||
* <p>The default is 0 (no delivery delay).
|
||||
* Note that this feature requires JMS 2.0.
|
||||
*/
|
||||
public void setDeliveryDelay(long deliveryDelay) {
|
||||
this.deliveryDelay = deliveryDelay;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the delivery delay to use for send calls (in milliseconds).
|
||||
*/
|
||||
public long getDeliveryDelay() {
|
||||
return this.deliveryDelay;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Set if the QOS values (deliveryMode, priority, timeToLive)
|
||||
|
@ -585,6 +609,12 @@ public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations
|
|||
* @throws JMSException if thrown by JMS API methods
|
||||
*/
|
||||
protected void doSend(MessageProducer producer, Message message) throws JMSException {
|
||||
if (this.deliveryDelay > 0) {
|
||||
if (setDeliveryDelayMethod == null) {
|
||||
throw new IllegalStateException("setDeliveryDelay requires JMS 2.0");
|
||||
}
|
||||
ReflectionUtils.invokeMethod(setDeliveryDelayMethod, producer, this.deliveryDelay);
|
||||
}
|
||||
if (isExplicitQosEnabled()) {
|
||||
producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue