Customize QosSettings for JMS replies
This commit introduces QosSettings that gather the Qualify of Service settings one can use when sending a message. Such object can now be associated to any JMS endpoint that allows to send a reply as part of the processing of an incoming message. Issue: SPR-15408
This commit is contained in:
parent
a49a0007b2
commit
1c0b3be6e6
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.springframework.jms.listener.AbstractMessageListenerContainer;
|
||||
import org.springframework.jms.support.QosSettings;
|
||||
import org.springframework.jms.support.converter.MessageConverter;
|
||||
import org.springframework.jms.support.destination.DestinationResolver;
|
||||
import org.springframework.util.ErrorHandler;
|
||||
|
@ -54,6 +55,8 @@ public abstract class AbstractJmsListenerContainerFactory<C extends AbstractMess
|
|||
|
||||
private Boolean replyPubSubDomain;
|
||||
|
||||
private QosSettings replyQosSettings;
|
||||
|
||||
private Boolean subscriptionDurable;
|
||||
|
||||
private Boolean subscriptionShared;
|
||||
|
@ -121,6 +124,13 @@ public abstract class AbstractJmsListenerContainerFactory<C extends AbstractMess
|
|||
this.replyPubSubDomain = replyPubSubDomain;
|
||||
}
|
||||
|
||||
/**
|
||||
* @see AbstractMessageListenerContainer#setReplyQosSettings(QosSettings)
|
||||
*/
|
||||
public void setReplyQosSettings(QosSettings replyQosSettings) {
|
||||
this.replyQosSettings = replyQosSettings;
|
||||
}
|
||||
|
||||
/**
|
||||
* @see AbstractMessageListenerContainer#setSubscriptionDurable(boolean)
|
||||
*/
|
||||
|
@ -184,6 +194,9 @@ public abstract class AbstractJmsListenerContainerFactory<C extends AbstractMess
|
|||
if (this.replyPubSubDomain != null) {
|
||||
instance.setReplyPubSubDomain(this.replyPubSubDomain);
|
||||
}
|
||||
if (this.replyQosSettings != null) {
|
||||
instance.setReplyQosSettings(this.replyQosSettings);
|
||||
}
|
||||
if (this.subscriptionDurable != null) {
|
||||
instance.setSubscriptionDurable(this.subscriptionDurable);
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.springframework.beans.factory.config.EmbeddedValueResolver;
|
|||
import org.springframework.core.annotation.AnnotatedElementUtils;
|
||||
import org.springframework.jms.listener.MessageListenerContainer;
|
||||
import org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter;
|
||||
import org.springframework.jms.support.QosSettings;
|
||||
import org.springframework.jms.support.converter.MessageConverter;
|
||||
import org.springframework.jms.support.destination.DestinationResolver;
|
||||
import org.springframework.messaging.handler.annotation.SendTo;
|
||||
|
@ -147,6 +148,10 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint imple
|
|||
messageListener.setDefaultResponseQueueName(responseDestination);
|
||||
}
|
||||
}
|
||||
QosSettings responseQosSettings = container.getReplyQosSettings();
|
||||
if (responseQosSettings != null) {
|
||||
messageListener.setResponseQosSettings(responseQosSettings);
|
||||
}
|
||||
MessageConverter messageConverter = container.getMessageConverter();
|
||||
if (messageConverter != null) {
|
||||
messageListener.setMessageConverter(messageConverter);
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -30,6 +30,7 @@ import javax.jms.Session;
|
|||
import javax.jms.Topic;
|
||||
|
||||
import org.springframework.jms.support.JmsUtils;
|
||||
import org.springframework.jms.support.QosSettings;
|
||||
import org.springframework.jms.support.converter.MessageConverter;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
@ -168,6 +169,8 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen
|
|||
|
||||
private Boolean replyPubSubDomain;
|
||||
|
||||
private QosSettings replyQosSettings;
|
||||
|
||||
private boolean pubSubNoLocal = false;
|
||||
|
||||
private MessageConverter messageConverter;
|
||||
|
@ -490,6 +493,22 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the {@link QosSettings} to use when sending a reply. Can be set to
|
||||
* {@code null} to indicate that the broker's defaults should be used.
|
||||
* @param replyQosSettings the QoS settings to use when sending a reply or {@code null}
|
||||
* to use the default vas.
|
||||
* @since 5.0
|
||||
*/
|
||||
public void setReplyQosSettings(QosSettings replyQosSettings) {
|
||||
this.replyQosSettings = replyQosSettings;
|
||||
}
|
||||
|
||||
@Override
|
||||
public QosSettings getReplyQosSettings() {
|
||||
return this.replyQosSettings;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link MessageConverter} strategy for converting JMS Messages.
|
||||
* @since 4.1
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -17,6 +17,7 @@
|
|||
package org.springframework.jms.listener;
|
||||
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.jms.support.QosSettings;
|
||||
import org.springframework.jms.support.converter.MessageConverter;
|
||||
import org.springframework.jms.support.destination.DestinationResolver;
|
||||
|
||||
|
@ -62,4 +63,11 @@ public interface MessageListenerContainer extends SmartLifecycle {
|
|||
*/
|
||||
boolean isReplyPubSubDomain();
|
||||
|
||||
/**
|
||||
* Return the {@link QosSettings} to use when sending a reply or {@code null}
|
||||
* if the broker's defaults should be used.
|
||||
* @since 5.0
|
||||
*/
|
||||
QosSettings getReplyQosSettings();
|
||||
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.springframework.jms.listener.SessionAwareMessageListener;
|
||||
import org.springframework.jms.support.JmsHeaderMapper;
|
||||
import org.springframework.jms.support.JmsUtils;
|
||||
import org.springframework.jms.support.QosSettings;
|
||||
import org.springframework.jms.support.SimpleJmsHeaderMapper;
|
||||
import org.springframework.jms.support.converter.MessageConversionException;
|
||||
import org.springframework.jms.support.converter.MessageConverter;
|
||||
|
@ -66,6 +67,7 @@ public abstract class AbstractAdaptableMessageListener
|
|||
|
||||
private final MessagingMessageConverterAdapter messagingMessageConverter = new MessagingMessageConverterAdapter();
|
||||
|
||||
private QosSettings responseQosSettings;
|
||||
|
||||
/**
|
||||
* Set the default destination to send response messages to. This will be applied
|
||||
|
@ -167,6 +169,24 @@ public abstract class AbstractAdaptableMessageListener
|
|||
return this.messagingMessageConverter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link QosSettings} to use when sending a response. Can be set to
|
||||
* {@code null} to indicate that the broker's defaults should be used.
|
||||
* @param responseQosSettings the QoS settings to use when sending a response or
|
||||
* {@code null} to use the default values.
|
||||
* @since 5.0
|
||||
*/
|
||||
public void setResponseQosSettings(QosSettings responseQosSettings) {
|
||||
this.responseQosSettings = responseQosSettings;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the {@link QosSettings} to use when sending a response or {@code null} if
|
||||
* the defaults should be used.
|
||||
*/
|
||||
protected QosSettings getResponseQosSettings() {
|
||||
return this.responseQosSettings;
|
||||
}
|
||||
|
||||
/**
|
||||
* Standard JMS {@link MessageListener} entry point.
|
||||
|
@ -399,7 +419,14 @@ public abstract class AbstractAdaptableMessageListener
|
|||
MessageProducer producer = session.createProducer(destination);
|
||||
try {
|
||||
postProcessProducer(producer, response);
|
||||
producer.send(response);
|
||||
QosSettings settings = getResponseQosSettings();
|
||||
if (settings != null) {
|
||||
producer.send(response, settings.getDeliveryMode(), settings.getPriority(),
|
||||
settings.getTimeToLive());
|
||||
}
|
||||
else {
|
||||
producer.send(response);
|
||||
}
|
||||
}
|
||||
finally {
|
||||
JmsUtils.closeMessageProducer(producer);
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -19,6 +19,7 @@ package org.springframework.jms.listener.endpoint;
|
|||
import javax.jms.Session;
|
||||
|
||||
import org.springframework.core.Constants;
|
||||
import org.springframework.jms.support.QosSettings;
|
||||
import org.springframework.jms.support.converter.MessageConverter;
|
||||
|
||||
/**
|
||||
|
@ -48,6 +49,8 @@ public class JmsActivationSpecConfig {
|
|||
|
||||
private Boolean replyPubSubDomain;
|
||||
|
||||
private QosSettings replyQosSettings;
|
||||
|
||||
private boolean subscriptionDurable = false;
|
||||
|
||||
private boolean subscriptionShared = false;
|
||||
|
@ -96,6 +99,14 @@ public class JmsActivationSpecConfig {
|
|||
}
|
||||
}
|
||||
|
||||
public void setReplyQosSettings(QosSettings replyQosSettings) {
|
||||
this.replyQosSettings = replyQosSettings;
|
||||
}
|
||||
|
||||
public QosSettings getReplyQosSettings() {
|
||||
return this.replyQosSettings;
|
||||
}
|
||||
|
||||
public void setSubscriptionDurable(boolean subscriptionDurable) {
|
||||
this.subscriptionDurable = subscriptionDurable;
|
||||
if (subscriptionDurable) {
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -22,6 +22,7 @@ import javax.resource.ResourceException;
|
|||
import org.springframework.beans.factory.BeanNameAware;
|
||||
import org.springframework.jca.endpoint.GenericMessageEndpointManager;
|
||||
import org.springframework.jms.listener.MessageListenerContainer;
|
||||
import org.springframework.jms.support.QosSettings;
|
||||
import org.springframework.jms.support.converter.MessageConverter;
|
||||
import org.springframework.jms.support.destination.DestinationResolver;
|
||||
|
||||
|
@ -217,4 +218,12 @@ public class JmsMessageEndpointManager extends GenericMessageEndpointManager
|
|||
throw new IllegalStateException("Could not determine reply pubSubDomain - no activation spec config is set");
|
||||
}
|
||||
|
||||
@Override
|
||||
public QosSettings getReplyQosSettings() {
|
||||
JmsActivationSpecConfig config = getActivationSpecConfig();
|
||||
if (config != null) {
|
||||
return config.getReplyQosSettings();
|
||||
}
|
||||
throw new IllegalStateException("Could not determine reply qosSettings - no activation spec config is set");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,135 @@
|
|||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.jms.support;
|
||||
|
||||
import javax.jms.Message;
|
||||
|
||||
/**
|
||||
* Gather the Quality of Service settings that can be used when sending a message.
|
||||
*
|
||||
* @author Stephane Nicoll
|
||||
* @since 5.0
|
||||
*/
|
||||
public class QosSettings {
|
||||
|
||||
private int deliveryMode;
|
||||
|
||||
private int priority;
|
||||
|
||||
private long timeToLive;
|
||||
|
||||
/**
|
||||
* Create a new instance with the default settings.
|
||||
* @see Message#DEFAULT_DELIVERY_MODE
|
||||
* @see Message#DEFAULT_PRIORITY
|
||||
* @see Message#DEFAULT_TIME_TO_LIVE
|
||||
*/
|
||||
public QosSettings() {
|
||||
this(Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY,
|
||||
Message.DEFAULT_TIME_TO_LIVE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new instance with the specified settings.
|
||||
*/
|
||||
public QosSettings(int deliveryMode, int priority, long timeToLive) {
|
||||
this.deliveryMode = deliveryMode;
|
||||
this.priority = priority;
|
||||
this.timeToLive = timeToLive;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the delivery mode to use when sending a message.
|
||||
* Default is the JMS Message default: "PERSISTENT".
|
||||
* @param deliveryMode the delivery mode to use
|
||||
* @see javax.jms.DeliveryMode#PERSISTENT
|
||||
* @see javax.jms.DeliveryMode#NON_PERSISTENT
|
||||
* @see javax.jms.Message#DEFAULT_DELIVERY_MODE
|
||||
* @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long)
|
||||
*/
|
||||
public void setDeliveryMode(int deliveryMode) {
|
||||
this.deliveryMode = deliveryMode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the delivery mode to use when sending a message.
|
||||
*/
|
||||
public int getDeliveryMode() {
|
||||
return this.deliveryMode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the priority of a message when sending.
|
||||
* @see javax.jms.Message#DEFAULT_PRIORITY
|
||||
* @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long)
|
||||
*/
|
||||
public void setPriority(int priority) {
|
||||
this.priority = priority;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the priority of a message when sending.
|
||||
*/
|
||||
public int getPriority() {
|
||||
return this.priority;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the time-to-live of the message when sending.
|
||||
* @param timeToLive the message's lifetime (in milliseconds)
|
||||
* @see javax.jms.Message#DEFAULT_TIME_TO_LIVE
|
||||
* @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long)
|
||||
*/
|
||||
public void setTimeToLive(long timeToLive) {
|
||||
this.timeToLive = timeToLive;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the time-to-live of the message when sending.
|
||||
*/
|
||||
public long getTimeToLive() {
|
||||
return this.timeToLive;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
QosSettings that = (QosSettings) o;
|
||||
|
||||
if (this.deliveryMode != that.deliveryMode) return false;
|
||||
if (this.priority != that.priority) return false;
|
||||
return this.timeToLive == that.timeToLive;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = this.deliveryMode;
|
||||
result = 31 * result + this.priority;
|
||||
result = 31 * result + (int) (this.timeToLive ^ (this.timeToLive >>> 32));
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "QosSettings{" + "deliveryMode=" + deliveryMode +
|
||||
", priority=" + priority +
|
||||
", timeToLive=" + timeToLive +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -34,6 +34,7 @@ import org.springframework.jms.listener.adapter.MessageListenerAdapter;
|
|||
import org.springframework.jms.listener.endpoint.JmsActivationSpecConfig;
|
||||
import org.springframework.jms.listener.endpoint.JmsMessageEndpointManager;
|
||||
import org.springframework.jms.listener.endpoint.StubJmsActivationSpecFactory;
|
||||
import org.springframework.jms.support.QosSettings;
|
||||
import org.springframework.jms.support.converter.MessageConverter;
|
||||
import org.springframework.jms.support.converter.SimpleMessageConverter;
|
||||
import org.springframework.jms.support.destination.DestinationResolver;
|
||||
|
@ -158,6 +159,7 @@ public class JmsListenerContainerFactoryTests {
|
|||
factory.setSessionAcknowledgeMode(Session.DUPS_OK_ACKNOWLEDGE);
|
||||
factory.setPubSubDomain(true);
|
||||
factory.setReplyPubSubDomain(true);
|
||||
factory.setReplyQosSettings(new QosSettings(1, 7, 5000));
|
||||
factory.setSubscriptionDurable(true);
|
||||
factory.setClientId("client-1234");
|
||||
factory.setAutoStartup(false);
|
||||
|
@ -171,6 +173,7 @@ public class JmsListenerContainerFactoryTests {
|
|||
assertEquals(Session.DUPS_OK_ACKNOWLEDGE, container.getSessionAcknowledgeMode());
|
||||
assertEquals(true, container.isPubSubDomain());
|
||||
assertEquals(true, container.isReplyPubSubDomain());
|
||||
assertEquals(new QosSettings(1, 7, 5000), container.getReplyQosSettings());
|
||||
assertEquals(true, container.isSubscriptionDurable());
|
||||
assertEquals("client-1234", container.getClientId());
|
||||
assertEquals(false, container.isAutoStartup());
|
||||
|
@ -182,6 +185,7 @@ public class JmsListenerContainerFactoryTests {
|
|||
factory.setMessageConverter(messageConverter);
|
||||
factory.setAcknowledgeMode(Session.DUPS_OK_ACKNOWLEDGE);
|
||||
factory.setPubSubDomain(true);
|
||||
factory.setReplyQosSettings(new QosSettings(1, 7, 5000));
|
||||
factory.setSubscriptionDurable(true);
|
||||
factory.setClientId("client-1234");
|
||||
}
|
||||
|
@ -193,6 +197,7 @@ public class JmsListenerContainerFactoryTests {
|
|||
assertNotNull(config);
|
||||
assertEquals(Session.DUPS_OK_ACKNOWLEDGE, config.getAcknowledgeMode());
|
||||
assertEquals(true, config.isPubSubDomain());
|
||||
assertEquals(new QosSettings(1, 7, 5000), container.getReplyQosSettings());
|
||||
assertEquals(true, config.isSubscriptionDurable());
|
||||
assertEquals("client-1234", config.getClientId());
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -20,6 +20,7 @@ import org.springframework.beans.factory.DisposableBean;
|
|||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.jms.JmsException;
|
||||
import org.springframework.jms.listener.MessageListenerContainer;
|
||||
import org.springframework.jms.support.QosSettings;
|
||||
import org.springframework.jms.support.converter.MessageConverter;
|
||||
import org.springframework.jms.support.destination.DestinationResolver;
|
||||
|
||||
|
@ -126,6 +127,11 @@ public class MessageListenerTestContainer implements MessageListenerContainer, I
|
|||
return isPubSubDomain();
|
||||
}
|
||||
|
||||
@Override
|
||||
public QosSettings getReplyQosSettings() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() {
|
||||
initializationInvoked = true;
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -47,6 +47,7 @@ import org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter;
|
|||
import org.springframework.jms.listener.adapter.ReplyFailureException;
|
||||
import org.springframework.jms.support.JmsHeaders;
|
||||
import org.springframework.jms.support.JmsMessageHeaderAccessor;
|
||||
import org.springframework.jms.support.QosSettings;
|
||||
import org.springframework.jms.support.converter.MessageConverter;
|
||||
import org.springframework.jms.support.destination.DestinationResolver;
|
||||
import org.springframework.messaging.Message;
|
||||
|
@ -315,8 +316,37 @@ public class MethodJmsListenerEndpointTests {
|
|||
assertDefaultListenerMethodInvocation();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void processAndReplyWithCustomReplyQosSettings() throws JMSException {
|
||||
String methodName = "processAndReplyWithSendTo";
|
||||
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
|
||||
QosSettings replyQosSettings = new QosSettings(1, 6, 6000);
|
||||
container.setReplyQosSettings(replyQosSettings);
|
||||
MessagingMessageListenerAdapter listener = createInstance(this.factory,
|
||||
getListenerMethod(methodName, String.class), container);
|
||||
processAndReplyWithSendTo(listener, "replyDestination", false, replyQosSettings);
|
||||
assertListenerMethodInvocation(this.sample, methodName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void processAndReplyWithNullReplyQosSettings() throws JMSException {
|
||||
String methodName = "processAndReplyWithSendTo";
|
||||
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
|
||||
container.setReplyQosSettings(null);
|
||||
MessagingMessageListenerAdapter listener = createInstance(this.factory,
|
||||
getListenerMethod(methodName, String.class), container);
|
||||
processAndReplyWithSendTo(listener, "replyDestination", false);
|
||||
assertListenerMethodInvocation(this.sample, methodName);
|
||||
}
|
||||
|
||||
private void processAndReplyWithSendTo(MessagingMessageListenerAdapter listener,
|
||||
String replyDestinationName, boolean pubSubDomain) throws JMSException {
|
||||
processAndReplyWithSendTo(listener, replyDestinationName, pubSubDomain, null);
|
||||
}
|
||||
|
||||
private void processAndReplyWithSendTo(MessagingMessageListenerAdapter listener,
|
||||
String replyDestinationName, boolean pubSubDomain,
|
||||
QosSettings replyQosSettings) throws JMSException {
|
||||
String body = "echo text";
|
||||
String correlationId = "link-1234";
|
||||
Destination replyDestination = new Destination() {};
|
||||
|
@ -338,7 +368,13 @@ public class MethodJmsListenerEndpointTests {
|
|||
|
||||
verify(destinationResolver).resolveDestinationName(session, replyDestinationName, pubSubDomain);
|
||||
verify(reply).setJMSCorrelationID(correlationId);
|
||||
verify(queueSender).send(reply);
|
||||
if (replyQosSettings != null) {
|
||||
verify(queueSender).send(reply, replyQosSettings.getDeliveryMode(),
|
||||
replyQosSettings.getPriority(), replyQosSettings.getTimeToLive());
|
||||
}
|
||||
else {
|
||||
verify(queueSender).send(reply);
|
||||
}
|
||||
verify(queueSender).close();
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -19,6 +19,7 @@ package org.springframework.jms.listener.adapter;
|
|||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageProducer;
|
||||
|
@ -36,6 +37,7 @@ import org.junit.rules.ExpectedException;
|
|||
import org.springframework.beans.factory.support.StaticListableBeanFactory;
|
||||
import org.springframework.jms.StubTextMessage;
|
||||
import org.springframework.jms.support.JmsHeaders;
|
||||
import org.springframework.jms.support.QosSettings;
|
||||
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
|
||||
import org.springframework.jms.support.converter.MessageConverter;
|
||||
import org.springframework.jms.support.converter.MessageType;
|
||||
|
@ -202,6 +204,50 @@ public class MessagingMessageListenerAdapterTests {
|
|||
verify(messageProducer).close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void replyWithCustomTimeToLive() throws JMSException {
|
||||
Session session = mock(Session.class);
|
||||
Queue replyDestination = mock(Queue.class);
|
||||
given(session.createQueue("queueOut")).willReturn(replyDestination);
|
||||
|
||||
MessageProducer messageProducer = mock(MessageProducer.class);
|
||||
TextMessage responseMessage = mock(TextMessage.class);
|
||||
given(session.createTextMessage("Response")).willReturn(responseMessage);
|
||||
given(session.createProducer(replyDestination)).willReturn(messageProducer);
|
||||
|
||||
MessagingMessageListenerAdapter listener = getPayloadInstance("Response", "replyPayloadToQueue", Message.class);
|
||||
QosSettings settings = new QosSettings();
|
||||
settings.setTimeToLive(6000);
|
||||
listener.setResponseQosSettings(settings);
|
||||
listener.onMessage(mock(javax.jms.Message.class), session);
|
||||
verify(session).createQueue("queueOut");
|
||||
verify(session).createTextMessage("Response");
|
||||
verify(messageProducer).send(responseMessage, javax.jms.Message.DEFAULT_DELIVERY_MODE,
|
||||
javax.jms.Message.DEFAULT_PRIORITY, 6000);
|
||||
verify(messageProducer).close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void replyWithFullQoS() throws JMSException {
|
||||
Session session = mock(Session.class);
|
||||
Queue replyDestination = mock(Queue.class);
|
||||
given(session.createQueue("queueOut")).willReturn(replyDestination);
|
||||
|
||||
MessageProducer messageProducer = mock(MessageProducer.class);
|
||||
TextMessage responseMessage = mock(TextMessage.class);
|
||||
given(session.createTextMessage("Response")).willReturn(responseMessage);
|
||||
given(session.createProducer(replyDestination)).willReturn(messageProducer);
|
||||
|
||||
MessagingMessageListenerAdapter listener = getPayloadInstance("Response", "replyPayloadToQueue", Message.class);
|
||||
QosSettings settings = new QosSettings(DeliveryMode.NON_PERSISTENT, 6, 6000);
|
||||
listener.setResponseQosSettings(settings);
|
||||
listener.onMessage(mock(javax.jms.Message.class), session);
|
||||
verify(session).createQueue("queueOut");
|
||||
verify(session).createTextMessage("Response");
|
||||
verify(messageProducer).send(responseMessage, DeliveryMode.NON_PERSISTENT, 6, 6000);
|
||||
verify(messageProducer).close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void replyPayloadToTopic() throws JMSException {
|
||||
Session session = mock(Session.class);
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -20,6 +20,8 @@ import org.junit.Rule;
|
|||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import org.springframework.jms.support.QosSettings;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
|
@ -61,6 +63,19 @@ public class JmsMessageEndpointManagerTests {
|
|||
assertEquals(false, endpoint.isReplyPubSubDomain());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void customReplyQosSettings() {
|
||||
JmsMessageEndpointManager endpoint = new JmsMessageEndpointManager();
|
||||
JmsActivationSpecConfig config = new JmsActivationSpecConfig();
|
||||
QosSettings settings = new QosSettings(1, 3, 5);
|
||||
config.setReplyQosSettings(settings);
|
||||
endpoint.setActivationSpecConfig(config);
|
||||
assertNotNull(endpoint.getReplyQosSettings());
|
||||
assertEquals(1, endpoint.getReplyQosSettings().getDeliveryMode());
|
||||
assertEquals(3, endpoint.getReplyQosSettings().getPriority());
|
||||
assertEquals(5, endpoint.getReplyQosSettings().getTimeToLive());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void isPubSubDomainWithNoConfig() {
|
||||
JmsMessageEndpointManager endpoint = new JmsMessageEndpointManager();
|
||||
|
@ -77,6 +92,14 @@ public class JmsMessageEndpointManagerTests {
|
|||
endpoint.isReplyPubSubDomain();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getReplyQosSettingsWithNoConfig() {
|
||||
JmsMessageEndpointManager endpoint = new JmsMessageEndpointManager();
|
||||
|
||||
this.thrown.expect(IllegalStateException.class); // far from ideal
|
||||
endpoint.getReplyQosSettings();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getMessageConverterNoConfig() {
|
||||
JmsMessageEndpointManager endpoint = new JmsMessageEndpointManager();
|
||||
|
|
|
@ -2793,6 +2793,29 @@ example can be rewritten as follows:
|
|||
}
|
||||
----
|
||||
|
||||
Finally if you need to specify some QoS values for the response such as the priority or
|
||||
the time to live, you can configure the `JmsListenerContainerFactory` accordingly:
|
||||
|
||||
[source,java,indent=0]
|
||||
[subs="verbatim,quotes"]
|
||||
----
|
||||
@Configuration
|
||||
@EnableJms
|
||||
public class AppConfig {
|
||||
|
||||
@Bean
|
||||
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
|
||||
DefaultJmsListenerContainerFactory factory =
|
||||
new DefaultJmsListenerContainerFactory();
|
||||
factory.setConnectionFactory(connectionFactory());
|
||||
QosSettings replyQosSettings = new ReplyQosSettings();
|
||||
replyQosSettings.setPriority(2);
|
||||
replyQosSettings.setTimeToLive(10000);
|
||||
factory.setReplyQosSettings(replyQosSettings);
|
||||
return factory;
|
||||
}
|
||||
}
|
||||
----
|
||||
|
||||
|
||||
[[jms-namespace]]
|
||||
|
|
Loading…
Reference in New Issue