Runtime resolution of JMS reply destination
Add JmsResponse that can be used as return type of any JMS listener method to indicate not only the response but also the actual destination to which the reply should be sent. Issue: SPR-13133
This commit is contained in:
parent
210e10c657
commit
bd093eb6bf
|
@ -242,7 +242,7 @@ public abstract class AbstractAdaptableMessageListener
|
||||||
try {
|
try {
|
||||||
Message response = buildMessage(session, result);
|
Message response = buildMessage(session, result);
|
||||||
postProcessResponse(request, response);
|
postProcessResponse(request, response);
|
||||||
Destination destination = getResponseDestination(request, response, session);
|
Destination destination = getResponseDestination(request, response, session, result);
|
||||||
sendResponse(session, destination, response);
|
sendResponse(session, destination, response);
|
||||||
}
|
}
|
||||||
catch (Exception ex) {
|
catch (Exception ex) {
|
||||||
|
@ -266,21 +266,24 @@ public abstract class AbstractAdaptableMessageListener
|
||||||
* @see #setMessageConverter
|
* @see #setMessageConverter
|
||||||
*/
|
*/
|
||||||
protected Message buildMessage(Session session, Object result) throws JMSException {
|
protected Message buildMessage(Session session, Object result) throws JMSException {
|
||||||
|
Object content = (result instanceof JmsResponse
|
||||||
|
? ((JmsResponse) result).getResponse() : result);
|
||||||
|
|
||||||
MessageConverter converter = getMessageConverter();
|
MessageConverter converter = getMessageConverter();
|
||||||
if (converter != null) {
|
if (converter != null) {
|
||||||
if (result instanceof org.springframework.messaging.Message) {
|
if (content instanceof org.springframework.messaging.Message) {
|
||||||
return this.messagingMessageConverter.toMessage(result, session);
|
return this.messagingMessageConverter.toMessage(content, session);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
return converter.toMessage(result, session);
|
return converter.toMessage(content, session);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
if (!(result instanceof Message)) {
|
if (!(content instanceof Message)) {
|
||||||
throw new MessageConversionException(
|
throw new MessageConversionException(
|
||||||
"No MessageConverter specified - cannot handle message [" + result + "]");
|
"No MessageConverter specified - cannot handle message [" + content + "]");
|
||||||
}
|
}
|
||||||
return (Message) result;
|
return (Message) content;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -302,6 +305,18 @@ public abstract class AbstractAdaptableMessageListener
|
||||||
response.setJMSCorrelationID(correlation);
|
response.setJMSCorrelationID(correlation);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Destination getResponseDestination(Message request, Message response, Session session, Object result)
|
||||||
|
throws JMSException {
|
||||||
|
if (result instanceof JmsResponse) {
|
||||||
|
JmsResponse jmsResponse = (JmsResponse) result;
|
||||||
|
Destination destination = jmsResponse.resolveDestination(getDestinationResolver(), session);
|
||||||
|
if (destination != null) {
|
||||||
|
return destination;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return getResponseDestination(request, response, session);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Determine a response destination for the given message.
|
* Determine a response destination for the given message.
|
||||||
* <p>The default implementation first checks the JMS Reply-To
|
* <p>The default implementation first checks the JMS Reply-To
|
||||||
|
|
|
@ -0,0 +1,140 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2002-2015 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.jms.listener.adapter;
|
||||||
|
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
import org.springframework.jms.support.destination.DestinationResolver;
|
||||||
|
import org.springframework.util.Assert;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return type of any JMS listener method used to indicate the actual response destination
|
||||||
|
* alongside the response itself. Typically used when said destination needs to be
|
||||||
|
* computed at runtime.
|
||||||
|
* <p>
|
||||||
|
* The example below sends a response with the content of the {@code result} argument to
|
||||||
|
* the {@code queueOut Queue}:
|
||||||
|
*
|
||||||
|
* <pre class="code">
|
||||||
|
* package com.acme.foo;
|
||||||
|
*
|
||||||
|
* public class MyService {
|
||||||
|
* @JmsListener
|
||||||
|
* public JmsResponse process(String msg) {
|
||||||
|
* // process incoming message
|
||||||
|
* return JmsResponse.forQueue(result, "queueOut");
|
||||||
|
* }
|
||||||
|
* }</pre>
|
||||||
|
*
|
||||||
|
* If the destination does not need to be computed at runtime,
|
||||||
|
* {@link org.springframework.messaging.handler.annotation.SendTo @SendTo} is the
|
||||||
|
* recommended declarative approach.
|
||||||
|
*
|
||||||
|
* @author Stephane Nicoll
|
||||||
|
* @since 4.2
|
||||||
|
* @see org.springframework.jms.annotation.JmsListener
|
||||||
|
* @see org.springframework.messaging.handler.annotation.SendTo
|
||||||
|
*/
|
||||||
|
public class JmsResponse {
|
||||||
|
|
||||||
|
private final Object response;
|
||||||
|
|
||||||
|
private final Object destination;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new instance
|
||||||
|
* @param response the content of the result
|
||||||
|
* @param destination the destination
|
||||||
|
*/
|
||||||
|
protected JmsResponse(Object response, Object destination) {
|
||||||
|
Assert.notNull(response, "Result must not be null");
|
||||||
|
this.response = response;
|
||||||
|
this.destination = destination;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a {@link JmsResponse} targeting the queue with the specified name.
|
||||||
|
*/
|
||||||
|
public static JmsResponse forQueue(Object result, String queueName) {
|
||||||
|
Assert.notNull(queueName, "Queue name must not be null");
|
||||||
|
return new JmsResponse(result, new DestinationNameHolder(queueName, false));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a {@link JmsResponse} targeting the topic with the specified name.
|
||||||
|
*/
|
||||||
|
public static JmsResponse forTopic(Object result, String topicName) {
|
||||||
|
Assert.notNull(topicName, "Topic name must not be null");
|
||||||
|
return new JmsResponse(result, new DestinationNameHolder(topicName, true));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a {@link JmsResponse} targeting the specified {@link Destination}.
|
||||||
|
*/
|
||||||
|
public static JmsResponse forDestination(Object result, Destination destination) {
|
||||||
|
Assert.notNull(destination, "Destination must not be null");
|
||||||
|
return new JmsResponse(result, destination);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public Object getResponse() {
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Destination resolveDestination(DestinationResolver destinationResolver, Session session)
|
||||||
|
throws JMSException {
|
||||||
|
|
||||||
|
if (this.destination instanceof Destination) {
|
||||||
|
return (Destination) this.destination;
|
||||||
|
}
|
||||||
|
if (this.destination instanceof DestinationNameHolder) {
|
||||||
|
DestinationNameHolder nameHolder = (DestinationNameHolder) this.destination;
|
||||||
|
return destinationResolver.resolveDestinationName(session,
|
||||||
|
nameHolder.destinationName, nameHolder.pubSubDomain);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "JmsResponse{" + "response=" + this.response + ", destination=" + this.destination + '}';
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Internal class combining a destination name
|
||||||
|
* and its target destination type (queue or topic).
|
||||||
|
*/
|
||||||
|
protected static class DestinationNameHolder {
|
||||||
|
private final String destinationName;
|
||||||
|
|
||||||
|
private final boolean pubSubDomain;
|
||||||
|
|
||||||
|
public DestinationNameHolder(String destinationName, boolean pubSubDomain) {
|
||||||
|
this.destinationName = destinationName;
|
||||||
|
this.pubSubDomain = pubSubDomain;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return this.destinationName + "{" + "pubSubDomain=" + this.pubSubDomain + '}';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,84 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2002-2015 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.jms.listener.adapter;
|
||||||
|
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.ExpectedException;
|
||||||
|
|
||||||
|
import org.springframework.jms.support.destination.DestinationResolver;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertSame;
|
||||||
|
import static org.mockito.BDDMockito.given;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Stephane Nicoll
|
||||||
|
*/
|
||||||
|
public class JmsResponseTests {
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public final ExpectedException thrown = ExpectedException.none();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void destinationDoesNotUseDestinationResolver() throws JMSException {
|
||||||
|
Destination destination = mock(Destination.class);
|
||||||
|
Destination actual = JmsResponse.forDestination("foo", destination).resolveDestination(null, null);
|
||||||
|
assertSame(destination, actual);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void resolveDestinationForQueue() throws JMSException {
|
||||||
|
Session session = mock(Session.class);
|
||||||
|
DestinationResolver destinationResolver = mock(DestinationResolver.class);
|
||||||
|
Destination destination = mock(Destination.class);
|
||||||
|
|
||||||
|
given(destinationResolver.resolveDestinationName(session, "myQueue", false)).willReturn(destination);
|
||||||
|
JmsResponse jmsResponse = JmsResponse.forQueue("foo", "myQueue");
|
||||||
|
Destination actual = jmsResponse.resolveDestination(destinationResolver, session);
|
||||||
|
assertSame(destination, actual);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void createWithNulResponse() {
|
||||||
|
thrown.expect(IllegalArgumentException.class);
|
||||||
|
JmsResponse.forQueue(null, "myQueue");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void createWithNullQueueName() {
|
||||||
|
thrown.expect(IllegalArgumentException.class);
|
||||||
|
JmsResponse.forQueue("foo", null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void createWithNullTopicName() {
|
||||||
|
thrown.expect(IllegalArgumentException.class);
|
||||||
|
JmsResponse.forTopic("foo", null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void createWithNulDestination() {
|
||||||
|
thrown.expect(IllegalArgumentException.class);
|
||||||
|
JmsResponse.forDestination("foo", null);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -21,8 +21,11 @@ import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Queue;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
|
import javax.jms.Topic;
|
||||||
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -146,6 +149,97 @@ public class MessagingMessageListenerAdapterTests {
|
||||||
assertEquals("Response", ((TextMessage) replyMessage).getText());
|
assertEquals("Response", ((TextMessage) replyMessage).getText());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void replyPayloadToQueue() throws JMSException {
|
||||||
|
Message<String> request = MessageBuilder.withPayload("Response").build();
|
||||||
|
|
||||||
|
Session session = mock(Session.class);
|
||||||
|
Queue replyDestination = mock(Queue.class);
|
||||||
|
given(session.createQueue("queueOut")).willReturn(replyDestination);
|
||||||
|
|
||||||
|
|
||||||
|
MessageProducer messageProducer = mock(MessageProducer.class);
|
||||||
|
TextMessage responseMessage = mock(TextMessage.class);
|
||||||
|
given(session.createTextMessage("Response")).willReturn(responseMessage);
|
||||||
|
given(session.createProducer(replyDestination)).willReturn(messageProducer);
|
||||||
|
|
||||||
|
MessagingMessageListenerAdapter listener = getPayloadInstance(request, "replyPayloadToQueue", Message.class);
|
||||||
|
listener.onMessage(mock(javax.jms.Message.class), session);
|
||||||
|
|
||||||
|
|
||||||
|
verify(session).createQueue("queueOut");
|
||||||
|
verify(session).createTextMessage("Response");
|
||||||
|
verify(messageProducer).send(responseMessage);
|
||||||
|
verify(messageProducer).close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void replyPayloadToTopic() throws JMSException {
|
||||||
|
Message<String> request = MessageBuilder.withPayload("Response").build();
|
||||||
|
|
||||||
|
Session session = mock(Session.class);
|
||||||
|
Topic replyDestination = mock(Topic.class);
|
||||||
|
given(session.createTopic("topicOut")).willReturn(replyDestination);
|
||||||
|
|
||||||
|
|
||||||
|
MessageProducer messageProducer = mock(MessageProducer.class);
|
||||||
|
TextMessage responseMessage = mock(TextMessage.class);
|
||||||
|
given(session.createTextMessage("Response")).willReturn(responseMessage);
|
||||||
|
given(session.createProducer(replyDestination)).willReturn(messageProducer);
|
||||||
|
|
||||||
|
MessagingMessageListenerAdapter listener = getPayloadInstance(request, "replyPayloadToTopic", Message.class);
|
||||||
|
listener.onMessage(mock(javax.jms.Message.class), session);
|
||||||
|
|
||||||
|
|
||||||
|
verify(session).createTopic("topicOut");
|
||||||
|
verify(session).createTextMessage("Response");
|
||||||
|
verify(messageProducer).send(responseMessage);
|
||||||
|
verify(messageProducer).close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void replyPayloadToDestination() throws JMSException {
|
||||||
|
Queue replyDestination = mock(Queue.class);
|
||||||
|
Message<String> request = MessageBuilder.withPayload("Response")
|
||||||
|
.setHeader("destination", replyDestination).build();
|
||||||
|
|
||||||
|
Session session = mock(Session.class);
|
||||||
|
MessageProducer messageProducer = mock(MessageProducer.class);
|
||||||
|
TextMessage responseMessage = mock(TextMessage.class);
|
||||||
|
given(session.createTextMessage("Response")).willReturn(responseMessage);
|
||||||
|
given(session.createProducer(replyDestination)).willReturn(messageProducer);
|
||||||
|
|
||||||
|
MessagingMessageListenerAdapter listener = getPayloadInstance(request, "replyPayloadToDestination", Message.class);
|
||||||
|
listener.onMessage(mock(javax.jms.Message.class), session);
|
||||||
|
|
||||||
|
verify(session, times(0)).createQueue(anyString());
|
||||||
|
verify(session).createTextMessage("Response");
|
||||||
|
verify(messageProducer).send(responseMessage);
|
||||||
|
verify(messageProducer).close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void replyPayloadNoDestination() throws JMSException {
|
||||||
|
Queue replyDestination = mock(Queue.class);
|
||||||
|
Message<String> request = MessageBuilder.withPayload("Response").build();
|
||||||
|
|
||||||
|
Session session = mock(Session.class);
|
||||||
|
MessageProducer messageProducer = mock(MessageProducer.class);
|
||||||
|
TextMessage responseMessage = mock(TextMessage.class);
|
||||||
|
given(session.createTextMessage("Response")).willReturn(responseMessage);
|
||||||
|
given(session.createProducer(replyDestination)).willReturn(messageProducer);
|
||||||
|
|
||||||
|
MessagingMessageListenerAdapter listener =
|
||||||
|
getPayloadInstance(request, "replyPayloadNoDestination", Message.class);
|
||||||
|
listener.setDefaultResponseDestination(replyDestination);
|
||||||
|
listener.onMessage(mock(javax.jms.Message.class), session);
|
||||||
|
|
||||||
|
verify(session, times(0)).createQueue(anyString());
|
||||||
|
verify(session).createTextMessage("Response");
|
||||||
|
verify(messageProducer).send(responseMessage);
|
||||||
|
verify(messageProducer).close();
|
||||||
|
}
|
||||||
|
|
||||||
protected MessagingMessageListenerAdapter getSimpleInstance(String methodName, Class... parameterTypes) {
|
protected MessagingMessageListenerAdapter getSimpleInstance(String methodName, Class... parameterTypes) {
|
||||||
Method m = ReflectionUtils.findMethod(SampleBean.class, methodName, parameterTypes);
|
Method m = ReflectionUtils.findMethod(SampleBean.class, methodName, parameterTypes);
|
||||||
return createInstance(m);
|
return createInstance(m);
|
||||||
|
@ -157,6 +251,19 @@ public class MessagingMessageListenerAdapterTests {
|
||||||
return adapter;
|
return adapter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected MessagingMessageListenerAdapter getPayloadInstance(final Object payload,
|
||||||
|
String methodName, Class... parameterTypes) {
|
||||||
|
Method m = ReflectionUtils.findMethod(SampleBean.class, methodName, parameterTypes);
|
||||||
|
MessagingMessageListenerAdapter adapter = new MessagingMessageListenerAdapter() {
|
||||||
|
@Override
|
||||||
|
protected Object extractMessage(javax.jms.Message message) {
|
||||||
|
return payload;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
adapter.setHandlerMethod(factory.createInvocableHandlerMethod(sample, m));
|
||||||
|
return adapter;
|
||||||
|
}
|
||||||
|
|
||||||
private void initializeFactory(DefaultMessageHandlerMethodFactory factory) {
|
private void initializeFactory(DefaultMessageHandlerMethodFactory factory) {
|
||||||
factory.setBeanFactory(new StaticListableBeanFactory());
|
factory.setBeanFactory(new StaticListableBeanFactory());
|
||||||
factory.afterPropertiesSet();
|
factory.afterPropertiesSet();
|
||||||
|
@ -178,6 +285,23 @@ public class MessagingMessageListenerAdapterTests {
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public JmsResponse replyPayloadToQueue(Message<String> input) {
|
||||||
|
return JmsResponse.forQueue(input.getPayload(), "queueOut");
|
||||||
|
}
|
||||||
|
|
||||||
|
public JmsResponse replyPayloadToTopic(Message<String> input) {
|
||||||
|
return JmsResponse.forTopic(input.getPayload(), "topicOut");
|
||||||
|
}
|
||||||
|
|
||||||
|
public JmsResponse replyPayloadToDestination(Message<String> input) {
|
||||||
|
return JmsResponse.forDestination(input.getPayload(),
|
||||||
|
input.getHeaders().get("destination", Destination.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
public JmsResponse replyPayloadNoDestination(Message<String> input) {
|
||||||
|
return new JmsResponse(input.getPayload(), null);
|
||||||
|
}
|
||||||
|
|
||||||
public void fail(String input) {
|
public void fail(String input) {
|
||||||
throw new IllegalArgumentException("Expected test exception");
|
throw new IllegalArgumentException("Expected test exception");
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue