From 1c0b3be6e60af5704a0d0ac6e119d620fe84b0f4 Mon Sep 17 00:00:00 2001 From: Stephane Nicoll Date: Tue, 18 Apr 2017 13:15:18 +0200 Subject: [PATCH] Customize QosSettings for JMS replies This commit introduces QosSettings that gather the Qualify of Service settings one can use when sending a message. Such object can now be associated to any JMS endpoint that allows to send a reply as part of the processing of an incoming message. Issue: SPR-15408 --- .../AbstractJmsListenerContainerFactory.java | 15 +- .../jms/config/MethodJmsListenerEndpoint.java | 5 + .../AbstractMessageListenerContainer.java | 21 ++- .../listener/MessageListenerContainer.java | 10 +- .../AbstractAdaptableMessageListener.java | 31 +++- .../endpoint/JmsActivationSpecConfig.java | 13 +- .../endpoint/JmsMessageEndpointManager.java | 11 +- .../jms/support/QosSettings.java | 135 ++++++++++++++++++ .../JmsListenerContainerFactoryTests.java | 7 +- .../config/MessageListenerTestContainer.java | 8 +- .../MethodJmsListenerEndpointTests.java | 40 +++++- .../MessagingMessageListenerAdapterTests.java | 48 ++++++- .../JmsMessageEndpointManagerTests.java | 25 +++- src/docs/asciidoc/integration.adoc | 23 +++ 14 files changed, 379 insertions(+), 13 deletions(-) create mode 100644 spring-jms/src/main/java/org/springframework/jms/support/QosSettings.java diff --git a/spring-jms/src/main/java/org/springframework/jms/config/AbstractJmsListenerContainerFactory.java b/spring-jms/src/main/java/org/springframework/jms/config/AbstractJmsListenerContainerFactory.java index 888f8e6b2b..a49946804f 100644 --- a/spring-jms/src/main/java/org/springframework/jms/config/AbstractJmsListenerContainerFactory.java +++ b/spring-jms/src/main/java/org/springframework/jms/config/AbstractJmsListenerContainerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.jms.listener.AbstractMessageListenerContainer; +import org.springframework.jms.support.QosSettings; import org.springframework.jms.support.converter.MessageConverter; import org.springframework.jms.support.destination.DestinationResolver; import org.springframework.util.ErrorHandler; @@ -54,6 +55,8 @@ public abstract class AbstractJmsListenerContainerFactory>> 32)); + return result; + } + + @Override + public String toString() { + return "QosSettings{" + "deliveryMode=" + deliveryMode + + ", priority=" + priority + + ", timeToLive=" + timeToLive + + '}'; + } +} diff --git a/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerContainerFactoryTests.java b/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerContainerFactoryTests.java index e6f4323284..7e1588aab3 100644 --- a/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerContainerFactoryTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/config/JmsListenerContainerFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,6 +34,7 @@ import org.springframework.jms.listener.adapter.MessageListenerAdapter; import org.springframework.jms.listener.endpoint.JmsActivationSpecConfig; import org.springframework.jms.listener.endpoint.JmsMessageEndpointManager; import org.springframework.jms.listener.endpoint.StubJmsActivationSpecFactory; +import org.springframework.jms.support.QosSettings; import org.springframework.jms.support.converter.MessageConverter; import org.springframework.jms.support.converter.SimpleMessageConverter; import org.springframework.jms.support.destination.DestinationResolver; @@ -158,6 +159,7 @@ public class JmsListenerContainerFactoryTests { factory.setSessionAcknowledgeMode(Session.DUPS_OK_ACKNOWLEDGE); factory.setPubSubDomain(true); factory.setReplyPubSubDomain(true); + factory.setReplyQosSettings(new QosSettings(1, 7, 5000)); factory.setSubscriptionDurable(true); factory.setClientId("client-1234"); factory.setAutoStartup(false); @@ -171,6 +173,7 @@ public class JmsListenerContainerFactoryTests { assertEquals(Session.DUPS_OK_ACKNOWLEDGE, container.getSessionAcknowledgeMode()); assertEquals(true, container.isPubSubDomain()); assertEquals(true, container.isReplyPubSubDomain()); + assertEquals(new QosSettings(1, 7, 5000), container.getReplyQosSettings()); assertEquals(true, container.isSubscriptionDurable()); assertEquals("client-1234", container.getClientId()); assertEquals(false, container.isAutoStartup()); @@ -182,6 +185,7 @@ public class JmsListenerContainerFactoryTests { factory.setMessageConverter(messageConverter); factory.setAcknowledgeMode(Session.DUPS_OK_ACKNOWLEDGE); factory.setPubSubDomain(true); + factory.setReplyQosSettings(new QosSettings(1, 7, 5000)); factory.setSubscriptionDurable(true); factory.setClientId("client-1234"); } @@ -193,6 +197,7 @@ public class JmsListenerContainerFactoryTests { assertNotNull(config); assertEquals(Session.DUPS_OK_ACKNOWLEDGE, config.getAcknowledgeMode()); assertEquals(true, config.isPubSubDomain()); + assertEquals(new QosSettings(1, 7, 5000), container.getReplyQosSettings()); assertEquals(true, config.isSubscriptionDurable()); assertEquals("client-1234", config.getClientId()); } diff --git a/spring-jms/src/test/java/org/springframework/jms/config/MessageListenerTestContainer.java b/spring-jms/src/test/java/org/springframework/jms/config/MessageListenerTestContainer.java index e21b532088..0ef73c9e32 100644 --- a/spring-jms/src/test/java/org/springframework/jms/config/MessageListenerTestContainer.java +++ b/spring-jms/src/test/java/org/springframework/jms/config/MessageListenerTestContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.jms.JmsException; import org.springframework.jms.listener.MessageListenerContainer; +import org.springframework.jms.support.QosSettings; import org.springframework.jms.support.converter.MessageConverter; import org.springframework.jms.support.destination.DestinationResolver; @@ -126,6 +127,11 @@ public class MessageListenerTestContainer implements MessageListenerContainer, I return isPubSubDomain(); } + @Override + public QosSettings getReplyQosSettings() { + return null; + } + @Override public void afterPropertiesSet() { initializationInvoked = true; diff --git a/spring-jms/src/test/java/org/springframework/jms/config/MethodJmsListenerEndpointTests.java b/spring-jms/src/test/java/org/springframework/jms/config/MethodJmsListenerEndpointTests.java index fb16b199fb..fa3d3dfd58 100644 --- a/spring-jms/src/test/java/org/springframework/jms/config/MethodJmsListenerEndpointTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/config/MethodJmsListenerEndpointTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,6 +47,7 @@ import org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter; import org.springframework.jms.listener.adapter.ReplyFailureException; import org.springframework.jms.support.JmsHeaders; import org.springframework.jms.support.JmsMessageHeaderAccessor; +import org.springframework.jms.support.QosSettings; import org.springframework.jms.support.converter.MessageConverter; import org.springframework.jms.support.destination.DestinationResolver; import org.springframework.messaging.Message; @@ -315,8 +316,37 @@ public class MethodJmsListenerEndpointTests { assertDefaultListenerMethodInvocation(); } + @Test + public void processAndReplyWithCustomReplyQosSettings() throws JMSException { + String methodName = "processAndReplyWithSendTo"; + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); + QosSettings replyQosSettings = new QosSettings(1, 6, 6000); + container.setReplyQosSettings(replyQosSettings); + MessagingMessageListenerAdapter listener = createInstance(this.factory, + getListenerMethod(methodName, String.class), container); + processAndReplyWithSendTo(listener, "replyDestination", false, replyQosSettings); + assertListenerMethodInvocation(this.sample, methodName); + } + + @Test + public void processAndReplyWithNullReplyQosSettings() throws JMSException { + String methodName = "processAndReplyWithSendTo"; + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); + container.setReplyQosSettings(null); + MessagingMessageListenerAdapter listener = createInstance(this.factory, + getListenerMethod(methodName, String.class), container); + processAndReplyWithSendTo(listener, "replyDestination", false); + assertListenerMethodInvocation(this.sample, methodName); + } + private void processAndReplyWithSendTo(MessagingMessageListenerAdapter listener, String replyDestinationName, boolean pubSubDomain) throws JMSException { + processAndReplyWithSendTo(listener, replyDestinationName, pubSubDomain, null); + } + + private void processAndReplyWithSendTo(MessagingMessageListenerAdapter listener, + String replyDestinationName, boolean pubSubDomain, + QosSettings replyQosSettings) throws JMSException { String body = "echo text"; String correlationId = "link-1234"; Destination replyDestination = new Destination() {}; @@ -338,7 +368,13 @@ public class MethodJmsListenerEndpointTests { verify(destinationResolver).resolveDestinationName(session, replyDestinationName, pubSubDomain); verify(reply).setJMSCorrelationID(correlationId); - verify(queueSender).send(reply); + if (replyQosSettings != null) { + verify(queueSender).send(reply, replyQosSettings.getDeliveryMode(), + replyQosSettings.getPriority(), replyQosSettings.getTimeToLive()); + } + else { + verify(queueSender).send(reply); + } verify(queueSender).close(); } diff --git a/spring-jms/src/test/java/org/springframework/jms/listener/adapter/MessagingMessageListenerAdapterTests.java b/spring-jms/src/test/java/org/springframework/jms/listener/adapter/MessagingMessageListenerAdapterTests.java index 0daf92c08c..229dda853e 100644 --- a/spring-jms/src/test/java/org/springframework/jms/listener/adapter/MessagingMessageListenerAdapterTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/listener/adapter/MessagingMessageListenerAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ package org.springframework.jms.listener.adapter; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; +import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; @@ -36,6 +37,7 @@ import org.junit.rules.ExpectedException; import org.springframework.beans.factory.support.StaticListableBeanFactory; import org.springframework.jms.StubTextMessage; import org.springframework.jms.support.JmsHeaders; +import org.springframework.jms.support.QosSettings; import org.springframework.jms.support.converter.MappingJackson2MessageConverter; import org.springframework.jms.support.converter.MessageConverter; import org.springframework.jms.support.converter.MessageType; @@ -202,6 +204,50 @@ public class MessagingMessageListenerAdapterTests { verify(messageProducer).close(); } + @Test + public void replyWithCustomTimeToLive() throws JMSException { + Session session = mock(Session.class); + Queue replyDestination = mock(Queue.class); + given(session.createQueue("queueOut")).willReturn(replyDestination); + + MessageProducer messageProducer = mock(MessageProducer.class); + TextMessage responseMessage = mock(TextMessage.class); + given(session.createTextMessage("Response")).willReturn(responseMessage); + given(session.createProducer(replyDestination)).willReturn(messageProducer); + + MessagingMessageListenerAdapter listener = getPayloadInstance("Response", "replyPayloadToQueue", Message.class); + QosSettings settings = new QosSettings(); + settings.setTimeToLive(6000); + listener.setResponseQosSettings(settings); + listener.onMessage(mock(javax.jms.Message.class), session); + verify(session).createQueue("queueOut"); + verify(session).createTextMessage("Response"); + verify(messageProducer).send(responseMessage, javax.jms.Message.DEFAULT_DELIVERY_MODE, + javax.jms.Message.DEFAULT_PRIORITY, 6000); + verify(messageProducer).close(); + } + + @Test + public void replyWithFullQoS() throws JMSException { + Session session = mock(Session.class); + Queue replyDestination = mock(Queue.class); + given(session.createQueue("queueOut")).willReturn(replyDestination); + + MessageProducer messageProducer = mock(MessageProducer.class); + TextMessage responseMessage = mock(TextMessage.class); + given(session.createTextMessage("Response")).willReturn(responseMessage); + given(session.createProducer(replyDestination)).willReturn(messageProducer); + + MessagingMessageListenerAdapter listener = getPayloadInstance("Response", "replyPayloadToQueue", Message.class); + QosSettings settings = new QosSettings(DeliveryMode.NON_PERSISTENT, 6, 6000); + listener.setResponseQosSettings(settings); + listener.onMessage(mock(javax.jms.Message.class), session); + verify(session).createQueue("queueOut"); + verify(session).createTextMessage("Response"); + verify(messageProducer).send(responseMessage, DeliveryMode.NON_PERSISTENT, 6, 6000); + verify(messageProducer).close(); + } + @Test public void replyPayloadToTopic() throws JMSException { Session session = mock(Session.class); diff --git a/spring-jms/src/test/java/org/springframework/jms/listener/endpoint/JmsMessageEndpointManagerTests.java b/spring-jms/src/test/java/org/springframework/jms/listener/endpoint/JmsMessageEndpointManagerTests.java index acb9f058f3..0b5c7cfadb 100644 --- a/spring-jms/src/test/java/org/springframework/jms/listener/endpoint/JmsMessageEndpointManagerTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/listener/endpoint/JmsMessageEndpointManagerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.springframework.jms.support.QosSettings; + import static org.junit.Assert.*; /** @@ -61,6 +63,19 @@ public class JmsMessageEndpointManagerTests { assertEquals(false, endpoint.isReplyPubSubDomain()); } + @Test + public void customReplyQosSettings() { + JmsMessageEndpointManager endpoint = new JmsMessageEndpointManager(); + JmsActivationSpecConfig config = new JmsActivationSpecConfig(); + QosSettings settings = new QosSettings(1, 3, 5); + config.setReplyQosSettings(settings); + endpoint.setActivationSpecConfig(config); + assertNotNull(endpoint.getReplyQosSettings()); + assertEquals(1, endpoint.getReplyQosSettings().getDeliveryMode()); + assertEquals(3, endpoint.getReplyQosSettings().getPriority()); + assertEquals(5, endpoint.getReplyQosSettings().getTimeToLive()); + } + @Test public void isPubSubDomainWithNoConfig() { JmsMessageEndpointManager endpoint = new JmsMessageEndpointManager(); @@ -77,6 +92,14 @@ public class JmsMessageEndpointManagerTests { endpoint.isReplyPubSubDomain(); } + @Test + public void getReplyQosSettingsWithNoConfig() { + JmsMessageEndpointManager endpoint = new JmsMessageEndpointManager(); + + this.thrown.expect(IllegalStateException.class); // far from ideal + endpoint.getReplyQosSettings(); + } + @Test public void getMessageConverterNoConfig() { JmsMessageEndpointManager endpoint = new JmsMessageEndpointManager(); diff --git a/src/docs/asciidoc/integration.adoc b/src/docs/asciidoc/integration.adoc index 2d3f3f54f6..358726b944 100644 --- a/src/docs/asciidoc/integration.adoc +++ b/src/docs/asciidoc/integration.adoc @@ -2793,6 +2793,29 @@ example can be rewritten as follows: } ---- +Finally if you need to specify some QoS values for the response such as the priority or +the time to live, you can configure the `JmsListenerContainerFactory` accordingly: + +[source,java,indent=0] +[subs="verbatim,quotes"] +---- + @Configuration + @EnableJms + public class AppConfig { + + @Bean + public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() { + DefaultJmsListenerContainerFactory factory = + new DefaultJmsListenerContainerFactory(); + factory.setConnectionFactory(connectionFactory()); + QosSettings replyQosSettings = new ReplyQosSettings(); + replyQosSettings.setPriority(2); + replyQosSettings.setTimeToLive(10000); + factory.setReplyQosSettings(replyQosSettings); + return factory; + } + } +---- [[jms-namespace]]