Avoid JMSException in listener execution
This commit avoids throwing JMSException from the listener execution as this is not allowed per spec. Our SessionAwareMessageListener gives a callback that can throw JMSException and we have "abused" it so far. Typical message processing goes in those 3 steps: * Unmarshall the javax.jms.Message to the requested type * Invoke the actual user method (including processing of method arguments) * Send a reply message, if any Those three steps have been harmonized so that they don't throw a JMSException anymore. For the later case, introduced ReplyFailureException as a general exception indicating the reply message could not have been sent. Issue: SPR-11778
This commit is contained in:
parent
b0f0d2f289
commit
6560aed1c8
|
@ -221,14 +221,19 @@ public abstract class AbstractAdaptableMessageListener
|
|||
* @param message the JMS {@code Message}
|
||||
* @return the content of the message, to be passed into the
|
||||
* listener method as argument
|
||||
* @throws JMSException if thrown by JMS API methods
|
||||
* @throws MessageConversionException if the message could not be unmarshaled
|
||||
*/
|
||||
protected Object extractMessage(Message message) throws JMSException {
|
||||
MessageConverter converter = getMessageConverter();
|
||||
if (converter != null) {
|
||||
return converter.fromMessage(message);
|
||||
protected Object extractMessage(Message message) {
|
||||
try {
|
||||
MessageConverter converter = getMessageConverter();
|
||||
if (converter != null) {
|
||||
return converter.fromMessage(message);
|
||||
}
|
||||
return message;
|
||||
}
|
||||
catch (JMSException e) {
|
||||
throw new MessageConversionException("Could not unmarshal message", e);
|
||||
}
|
||||
return message;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -237,22 +242,27 @@ public abstract class AbstractAdaptableMessageListener
|
|||
* @param result the result object to handle (never {@code null})
|
||||
* @param request the original request message
|
||||
* @param session the JMS Session to operate on (may be {@code null})
|
||||
* @throws JMSException if thrown by JMS API methods
|
||||
* @throws ReplyFailureException if the response message could not be sent
|
||||
* @see #buildMessage
|
||||
* @see #postProcessResponse
|
||||
* @see #getResponseDestination
|
||||
* @see #sendResponse
|
||||
*/
|
||||
protected void handleResult(Object result, Message request, Session session) throws JMSException {
|
||||
protected void handleResult(Object result, Message request, Session session) {
|
||||
if (session != null) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Listener method returned result [" + result +
|
||||
"] - generating response message for it");
|
||||
}
|
||||
Message response = buildMessage(session, result);
|
||||
postProcessResponse(request, response);
|
||||
Destination destination = getResponseDestination(request, response, session);
|
||||
sendResponse(session, destination, response);
|
||||
try {
|
||||
Message response = buildMessage(session, result);
|
||||
postProcessResponse(request, response);
|
||||
Destination destination = getResponseDestination(request, response, session);
|
||||
sendResponse(session, destination, response);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new ReplyFailureException("Failed to send reply with payload '" + result + "'", e);
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (logger.isWarnEnabled()) {
|
||||
|
|
|
@ -60,18 +60,36 @@ public class MessagingMessageListenerAdapter extends AbstractAdaptableMessageLis
|
|||
|
||||
@Override
|
||||
public void onMessage(javax.jms.Message jmsMessage, Session session) throws JMSException {
|
||||
Message<?> message = toMessagingMessage(jmsMessage);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Processing [" + message + "]");
|
||||
}
|
||||
Object result = invokeHandler(jmsMessage, session, message);
|
||||
if (result != null) {
|
||||
handleResult(result, jmsMessage, session);
|
||||
}
|
||||
else {
|
||||
logger.trace("No result object given - no result to handle");
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected Message<?> toMessagingMessage(javax.jms.Message jmsMessage) {
|
||||
Map<String, Object> mappedHeaders = getHeaderMapper().toHeaders(jmsMessage);
|
||||
Object convertedObject = extractMessage(jmsMessage);
|
||||
MessageBuilder<Object> builder = (convertedObject instanceof org.springframework.messaging.Message) ?
|
||||
MessageBuilder.fromMessage((org.springframework.messaging.Message<Object>) convertedObject) :
|
||||
MessageBuilder.withPayload(convertedObject);
|
||||
return builder.copyHeadersIfAbsent(mappedHeaders).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoke the handler, wrapping any exception to a {@link ListenerExecutionFailedException} with
|
||||
* a dedicated error message.
|
||||
*/
|
||||
private Object invokeHandler(javax.jms.Message jmsMessage, Session session, Message<?> message) {
|
||||
try {
|
||||
Message<?> message = toMessagingMessage(jmsMessage);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Processing [" + message + "]");
|
||||
}
|
||||
Object result = handlerMethod.invoke(message, jmsMessage, session);
|
||||
if (result != null) {
|
||||
handleResult(result, jmsMessage, session);
|
||||
}
|
||||
else {
|
||||
logger.trace("No result object given - no result to handle");
|
||||
}
|
||||
return handlerMethod.invoke(message, jmsMessage, session);
|
||||
}
|
||||
catch (MessagingException e) {
|
||||
throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " +
|
||||
|
@ -83,16 +101,6 @@ public class MessagingMessageListenerAdapter extends AbstractAdaptableMessageLis
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected Message<?> toMessagingMessage(javax.jms.Message jmsMessage) throws JMSException {
|
||||
Map<String, Object> mappedHeaders = getHeaderMapper().toHeaders(jmsMessage);
|
||||
Object convertedObject = extractMessage(jmsMessage);
|
||||
MessageBuilder<Object> builder = (convertedObject instanceof org.springframework.messaging.Message) ?
|
||||
MessageBuilder.fromMessage((org.springframework.messaging.Message<Object>) convertedObject) :
|
||||
MessageBuilder.withPayload(convertedObject);
|
||||
return builder.copyHeadersIfAbsent(mappedHeaders).build();
|
||||
}
|
||||
|
||||
private String createMessagingErrorMessage(String description) {
|
||||
StringBuilder sb = new StringBuilder(description).append("\n")
|
||||
.append("Endpoint handler details:\n")
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright 2002-2014 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.jms.listener.adapter;
|
||||
|
||||
import org.springframework.jms.JmsException;
|
||||
|
||||
/**
|
||||
* Exception to be thrown when the reply of a message failed to be sent.
|
||||
*
|
||||
* @author Stephane Nicoll
|
||||
* @since 4.1
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
public class ReplyFailureException extends JmsException {
|
||||
|
||||
public ReplyFailureException(String msg, Throwable cause) {
|
||||
super(msg, cause);
|
||||
}
|
||||
}
|
|
@ -46,6 +46,7 @@ import org.springframework.jms.StubTextMessage;
|
|||
import org.springframework.jms.listener.DefaultMessageListenerContainer;
|
||||
import org.springframework.jms.listener.MessageListenerContainer;
|
||||
import org.springframework.jms.listener.SimpleMessageListenerContainer;
|
||||
import org.springframework.jms.listener.adapter.ReplyFailureException;
|
||||
import org.springframework.jms.listener.adapter.ListenerExecutionFailedException;
|
||||
import org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter;
|
||||
import org.springframework.jms.support.JmsMessageHeaderAccessor;
|
||||
|
@ -275,7 +276,7 @@ public class MethodJmsListenerEndpointTests {
|
|||
Session session = mock(Session.class);
|
||||
given(session.createTextMessage("content")).willReturn(reply);
|
||||
|
||||
thrown.expect(ListenerExecutionFailedException.class);
|
||||
thrown.expect(ReplyFailureException.class);
|
||||
thrown.expectCause(Matchers.isA(InvalidDestinationException.class));
|
||||
listener.onMessage(createSimpleJmsTextMessage("content"), session);
|
||||
}
|
||||
|
|
|
@ -306,8 +306,10 @@ public class MessageListenerAdapterTests {
|
|||
};
|
||||
try {
|
||||
adapter.onMessage(sentTextMessage, session);
|
||||
fail("expected InvalidDestinationException");
|
||||
} catch(InvalidDestinationException ex) { /* expected */ }
|
||||
fail("expected CouldNotSendReplyException with InvalidDestinationException");
|
||||
} catch(ReplyFailureException ex) {
|
||||
assertEquals(InvalidDestinationException.class, ex.getCause().getClass());
|
||||
}
|
||||
|
||||
verify(responseTextMessage).setJMSCorrelationID(CORRELATION_ID);
|
||||
verify(delegate).handleMessage(sentTextMessage);
|
||||
|
@ -342,8 +344,10 @@ public class MessageListenerAdapterTests {
|
|||
};
|
||||
try {
|
||||
adapter.onMessage(sentTextMessage, session);
|
||||
fail("expected JMSException");
|
||||
} catch(JMSException ex) { /* expected */ }
|
||||
fail("expected CouldNotSendReplyException with JMSException");
|
||||
} catch(ReplyFailureException ex) {
|
||||
assertEquals(JMSException.class, ex.getCause().getClass());
|
||||
}
|
||||
|
||||
verify(responseTextMessage).setJMSCorrelationID(CORRELATION_ID);
|
||||
verify(messageProducer).close();
|
||||
|
@ -420,8 +424,10 @@ public class MessageListenerAdapterTests {
|
|||
adapter.setMessageConverter(null);
|
||||
try {
|
||||
adapter.onMessage(sentTextMessage, session);
|
||||
fail("expected MessageConversionException");
|
||||
} catch(MessageConversionException ex) { /* expected */ }
|
||||
fail("expected CouldNotSendReplyException with MessageConversionException");
|
||||
} catch(ReplyFailureException ex) {
|
||||
assertEquals(MessageConversionException.class, ex.getCause().getClass());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.springframework.jms.StubTextMessage;
|
|||
import org.springframework.jms.config.DefaultJmsHandlerMethodFactory;
|
||||
import org.springframework.jms.support.converter.JmsHeaders;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.converter.MessageConversionException;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
|
@ -63,7 +64,8 @@ public class MessagingMessageListenerAdapterTests {
|
|||
|
||||
Session session = mock(Session.class);
|
||||
given(session.createTextMessage("Response")).willReturn(new StubTextMessage("Response"));
|
||||
javax.jms.Message replyMessage = getSimpleInstance().buildMessage(session, result);
|
||||
MessagingMessageListenerAdapter listener = getSimpleInstance("echo", Message.class);
|
||||
javax.jms.Message replyMessage = listener.buildMessage(session, result);
|
||||
|
||||
verify(session).createTextMessage("Response");
|
||||
assertNotNull("reply should never be null", replyMessage);
|
||||
|
@ -73,8 +75,45 @@ public class MessagingMessageListenerAdapterTests {
|
|||
assertEquals("replyTo header not copied", replyTo, replyMessage.getJMSReplyTo());
|
||||
}
|
||||
|
||||
protected MessagingMessageListenerAdapter getSimpleInstance() {
|
||||
Method m = ReflectionUtils.findMethod(SampleBean.class, "echo", Message.class);
|
||||
@Test
|
||||
public void exceptionInListener() {
|
||||
javax.jms.Message message = new StubTextMessage("foo");
|
||||
Session session = mock(Session.class);
|
||||
MessagingMessageListenerAdapter listener = getSimpleInstance("fail", String.class);
|
||||
|
||||
try {
|
||||
listener.onMessage(message, session);
|
||||
fail("Should have thrown an exception");
|
||||
}
|
||||
catch (JMSException e) {
|
||||
fail("Should not have thrown a JMS exception");
|
||||
}
|
||||
catch (ListenerExecutionFailedException e) {
|
||||
assertEquals(IllegalArgumentException.class, e.getCause().getClass());
|
||||
assertEquals("Expected test exception", e.getCause().getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void exceptionInInvocation() {
|
||||
javax.jms.Message message = new StubTextMessage("foo");
|
||||
Session session = mock(Session.class);
|
||||
MessagingMessageListenerAdapter listener = getSimpleInstance("wrongParam", Integer.class);
|
||||
|
||||
try {
|
||||
listener.onMessage(message, session);
|
||||
fail("Should have thrown an exception");
|
||||
}
|
||||
catch (JMSException e) {
|
||||
fail("Should not have thrown a JMS exception");
|
||||
}
|
||||
catch (ListenerExecutionFailedException e) {
|
||||
assertEquals(MessageConversionException.class, e.getCause().getClass());
|
||||
}
|
||||
}
|
||||
|
||||
protected MessagingMessageListenerAdapter getSimpleInstance(String methodName, Class... parameterTypes) {
|
||||
Method m = ReflectionUtils.findMethod(SampleBean.class, methodName, parameterTypes);
|
||||
return createInstance(m);
|
||||
}
|
||||
|
||||
|
@ -97,5 +136,13 @@ public class MessagingMessageListenerAdapterTests {
|
|||
.setHeader(JmsHeaders.TYPE, "reply")
|
||||
.build();
|
||||
}
|
||||
|
||||
public void fail(String input) {
|
||||
throw new IllegalArgumentException("Expected test exception");
|
||||
}
|
||||
|
||||
public void wrongParam(Integer i) {
|
||||
throw new IllegalArgumentException("Should not have been called");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue