Refine destination semantics for msg-handling methods

After this change, annotated message handling methods configured to use
a destination prefix (e.g. "/app") no longer have to include the prefix
in their mapping. For example if a client sends a message to "/app/foo"
the annotated methods should be mapped with @MessageMapping("/foo").
This commit is contained in:
Rossen Stoyanchev 2013-09-03 11:04:00 -04:00
parent e1a46bb57a
commit 0ac6998e60
20 changed files with 325 additions and 126 deletions

View File

@ -25,6 +25,7 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -149,7 +150,10 @@ public final class MessageHeaders implements Map<String, Object>, Serializable {
@Override @Override
public String toString() { public String toString() {
return this.headers.toString(); Map<String, Object> map = new LinkedHashMap<String, Object>(this.headers);
map.put(ID, map.remove(ID)); // remove and add again at the end
map.put(TIMESTAMP, map.remove(TIMESTAMP));
return map.toString();
} }
/* /*

View File

@ -19,6 +19,8 @@ package org.springframework.messaging.handler.method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.MethodParameter; import org.springframework.core.MethodParameter;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -30,6 +32,8 @@ import org.springframework.util.Assert;
*/ */
public class HandlerMethodReturnValueHandlerComposite implements HandlerMethodReturnValueHandler { public class HandlerMethodReturnValueHandlerComposite implements HandlerMethodReturnValueHandler {
private static Log logger = LogFactory.getLog(HandlerMethodReturnValueHandlerComposite.class);
private final List<HandlerMethodReturnValueHandler> returnValueHandlers = new ArrayList<HandlerMethodReturnValueHandler>(); private final List<HandlerMethodReturnValueHandler> returnValueHandlers = new ArrayList<HandlerMethodReturnValueHandler>();
@ -61,6 +65,9 @@ public class HandlerMethodReturnValueHandlerComposite implements HandlerMethodRe
private HandlerMethodReturnValueHandler getReturnValueHandler(MethodParameter returnType) { private HandlerMethodReturnValueHandler getReturnValueHandler(MethodParameter returnType) {
for (HandlerMethodReturnValueHandler handler : this.returnValueHandlers) { for (HandlerMethodReturnValueHandler handler : this.returnValueHandlers) {
if (handler.supportsReturnType(returnType)) { if (handler.supportsReturnType(returnType)) {
if (logger.isTraceEnabled()) {
logger.trace("Processing return value with " + handler);
}
return handler; return handler;
} }
} }
@ -72,7 +79,7 @@ public class HandlerMethodReturnValueHandlerComposite implements HandlerMethodRe
throws Exception { throws Exception {
HandlerMethodReturnValueHandler handler = getReturnValueHandler(returnType); HandlerMethodReturnValueHandler handler = getReturnValueHandler(returnType);
Assert.notNull(handler, "Unknown return value type [" + returnType.getParameterType().getName() + "]"); Assert.notNull(handler, "No handler for return value type [" + returnType.getParameterType().getName() + "]");
handler.handleReturnValue(returnValue, returnType, message); handler.handleReturnValue(returnValue, returnType, message);
} }

View File

@ -104,16 +104,17 @@ public class InvocableHandlerMethod extends HandlerMethod {
Object[] args = getMethodArgumentValues(message, providedArgs); Object[] args = getMethodArgumentValues(message, providedArgs);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
StringBuilder builder = new StringBuilder("Invoking ["); StringBuilder sb = new StringBuilder("Invoking [");
builder.append(this.getMethod().getName()).append("] method with arguments "); sb.append(this.getBeanType().getSimpleName()).append(".");
builder.append(Arrays.asList(args)); sb.append(this.getMethod().getName()).append("] method with arguments ");
logger.trace(builder.toString()); sb.append(Arrays.asList(args));
logger.trace(sb.toString());
} }
Object returnValue = invoke(args); Object returnValue = invoke(args);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Method [" + this.getMethod().getName() + "] returned [" + returnValue + "]"); logger.trace("Method returned [" + returnValue + "]");
} }
return returnValue; return returnValue;

View File

@ -16,7 +16,10 @@
package org.springframework.messaging.simp.annotation.support; package org.springframework.messaging.simp.annotation.support;
import java.lang.annotation.Annotation;
import org.springframework.core.MethodParameter; import org.springframework.core.MethodParameter;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.core.MessagePostProcessor; import org.springframework.messaging.core.MessagePostProcessor;
@ -27,6 +30,7 @@ import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.messaging.simp.annotation.ReplyToUser; import org.springframework.messaging.simp.annotation.ReplyToUser;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
/** /**
@ -46,17 +50,23 @@ public class ReplyToMethodReturnValueHandler implements HandlerMethodReturnValue
private final SimpMessageSendingOperations messagingTemplate; private final SimpMessageSendingOperations messagingTemplate;
private final boolean annotationRequired;
public ReplyToMethodReturnValueHandler(SimpMessageSendingOperations messagingTemplate) {
public ReplyToMethodReturnValueHandler(SimpMessageSendingOperations messagingTemplate, boolean annotationRequired) {
Assert.notNull(messagingTemplate, "messagingTemplate is required"); Assert.notNull(messagingTemplate, "messagingTemplate is required");
this.messagingTemplate = messagingTemplate; this.messagingTemplate = messagingTemplate;
this.annotationRequired = annotationRequired;
} }
@Override @Override
public boolean supportsReturnType(MethodParameter returnType) { public boolean supportsReturnType(MethodParameter returnType) {
return ((returnType.getMethodAnnotation(ReplyTo.class) != null) if ((returnType.getMethodAnnotation(ReplyTo.class) != null) ||
|| (returnType.getMethodAnnotation(ReplyToUser.class) != null)); (returnType.getMethodAnnotation(ReplyToUser.class) != null)) {
return true;
}
return (!this.annotationRequired);
} }
@Override @Override
@ -72,23 +82,32 @@ public class ReplyToMethodReturnValueHandler implements HandlerMethodReturnValue
String sessionId = inputHeaders.getSessionId(); String sessionId = inputHeaders.getSessionId();
MessagePostProcessor postProcessor = new SessionHeaderPostProcessor(sessionId); MessagePostProcessor postProcessor = new SessionHeaderPostProcessor(sessionId);
ReplyTo replyTo = returnType.getMethodAnnotation(ReplyTo.class);
if (replyTo != null) {
for (String destination : replyTo.value()) {
this.messagingTemplate.convertAndSend(destination, returnValue, postProcessor);
}
}
ReplyToUser replyToUser = returnType.getMethodAnnotation(ReplyToUser.class); ReplyToUser replyToUser = returnType.getMethodAnnotation(ReplyToUser.class);
if (replyToUser != null) { if (replyToUser != null) {
if (inputHeaders.getUser() == null) { if (inputHeaders.getUser() == null) {
throw new MissingSessionUserException(inputMessage); throw new MissingSessionUserException(inputMessage);
} }
String user = inputHeaders.getUser().getName(); String user = inputHeaders.getUser().getName();
for (String destination : replyToUser.value()) { for (String destination : getDestinations(replyToUser, inputHeaders.getDestination())) {
this.messagingTemplate.convertAndSendToUser(user, destination, returnValue, postProcessor); this.messagingTemplate.convertAndSendToUser(user, destination, returnValue, postProcessor);
} }
return;
} }
ReplyTo replyTo = returnType.getMethodAnnotation(ReplyTo.class);
if (replyTo != null) {
for (String destination : getDestinations(replyTo, inputHeaders.getDestination())) {
this.messagingTemplate.convertAndSend(destination, returnValue, postProcessor);
}
return;
}
this.messagingTemplate.convertAndSend(inputHeaders.getDestination(), returnValue, postProcessor);
}
private String[] getDestinations(Annotation annot, String inputDestination) {
String[] destinations = (String[]) AnnotationUtils.getValue(annot);
return ObjectUtils.isEmpty(destinations) ? new String[] { inputDestination } : destinations;
} }
@ -107,4 +126,10 @@ public class ReplyToMethodReturnValueHandler implements HandlerMethodReturnValue
return MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build(); return MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build();
} }
} }
@Override
public String toString() {
return "ReplyToMethodReturnValueHandler [annotationRequired=" + annotationRequired + "]";
}
} }

View File

@ -47,16 +47,37 @@ public class MessageBrokerConfigurer {
this.webSocketResponseChannel = webSocketResponseChannel; this.webSocketResponseChannel = webSocketResponseChannel;
} }
/**
* Enable a simple message broker and configure one or more prefixes to filter
* destinations targeting the broker (e.g. destinations prefixed with "/topic").
*/
public SimpleBrokerRegistration enableSimpleBroker(String... destinationPrefixes) { public SimpleBrokerRegistration enableSimpleBroker(String... destinationPrefixes) {
this.simpleBroker = new SimpleBrokerRegistration(this.webSocketResponseChannel, destinationPrefixes); this.simpleBroker = new SimpleBrokerRegistration(this.webSocketResponseChannel, destinationPrefixes);
return this.simpleBroker; return this.simpleBroker;
} }
/**
* Enable a STOMP broker relay and configure the destination prefixes supported by the
* message broker. Check the STOMP documentation of the message broker for supported
* destinations.
*/
public StompBrokerRelayRegistration enableStompBrokerRelay(String... destinationPrefixes) { public StompBrokerRelayRegistration enableStompBrokerRelay(String... destinationPrefixes) {
this.stompRelay = new StompBrokerRelayRegistration(this.webSocketResponseChannel, destinationPrefixes); this.stompRelay = new StompBrokerRelayRegistration(this.webSocketResponseChannel, destinationPrefixes);
return this.stompRelay; return this.stompRelay;
} }
/**
* Configure one or more prefixes to filter destinations targeting annotated
* application methods. For example destinations prefixed with "/app" may be processed
* by annotated application methods while other destinations may target the message
* broker (e.g. "/topic", "/queue").
* <p>
* When messages are processed, the matching prefix is removed from the destination in
* order to form the lookup path. This means annotations should not contain the
* destination prefix.
* <p>
* Prefixes that do not have a trailing slash will have one automatically appended.
*/
public MessageBrokerConfigurer setAnnotationMethodDestinationPrefixes(String... destinationPrefixes) { public MessageBrokerConfigurer setAnnotationMethodDestinationPrefixes(String... destinationPrefixes) {
this.annotationMethodDestinationPrefixes = destinationPrefixes; this.annotationMethodDestinationPrefixes = destinationPrefixes;
return this; return this;

View File

@ -141,7 +141,7 @@ public abstract class AbstractBrokerMessageHandler
} }
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Processing message: " + message); logger.trace("Message " + message);
} }
handleMessageInternal(message); handleMessageInternal(message);

View File

@ -56,7 +56,8 @@ public abstract class AbstractSubscriptionRegistry implements SubscriptionRegist
return; return;
} }
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Subscribe request: " + message); logger.debug("Adding subscription id=" + headers.getSubscriptionId()
+ ", destination=" + headers.getDestination());
} }
addSubscriptionInternal(sessionId, subscriptionId, destination, message); addSubscriptionInternal(sessionId, subscriptionId, destination, message);
} }

View File

@ -60,6 +60,7 @@ import org.springframework.messaging.simp.annotation.UnsubscribeEvent;
import org.springframework.messaging.simp.annotation.support.PrincipalMethodArgumentResolver; import org.springframework.messaging.simp.annotation.support.PrincipalMethodArgumentResolver;
import org.springframework.messaging.simp.annotation.support.ReplyToMethodReturnValueHandler; import org.springframework.messaging.simp.annotation.support.ReplyToMethodReturnValueHandler;
import org.springframework.messaging.simp.annotation.support.SubscriptionMethodReturnValueHandler; import org.springframework.messaging.simp.annotation.support.SubscriptionMethodReturnValueHandler;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.converter.MessageConverter; import org.springframework.messaging.support.converter.MessageConverter;
import org.springframework.stereotype.Controller; import org.springframework.stereotype.Controller;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -80,7 +81,7 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati
private final SimpMessageSendingOperations webSocketResponseTemplate; private final SimpMessageSendingOperations webSocketResponseTemplate;
private Collection<String> destinationPrefixes; private Collection<String> destinationPrefixes = new ArrayList<String>();
private MessageConverter<?> messageConverter; private MessageConverter<?> messageConverter;
@ -117,9 +118,29 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati
this.webSocketResponseTemplate = new SimpMessagingTemplate(webSocketResponseChannel); this.webSocketResponseTemplate = new SimpMessagingTemplate(webSocketResponseChannel);
} }
/**
* Configure one or more prefixes to filter destinations targeting annotated
* application methods. For example destinations prefixed with "/app" may be processed
* by annotated application methods while other destinations may target the message
* broker (e.g. "/topic", "/queue").
* <p>
* When messages are processed, the matching prefix is removed from the destination in
* order to form the lookup path. This means annotations should not contain the
* destination prefix.
* <p>
* Prefixes that do not have a trailing slash will have one automatically appended.
*/
public void setDestinationPrefixes(Collection<String> destinationPrefixes) { public void setDestinationPrefixes(Collection<String> destinationPrefixes) {
this.destinationPrefixes = destinationPrefixes; this.destinationPrefixes.clear();
if (destinationPrefixes != null) {
for (String prefix : destinationPrefixes) {
prefix = prefix.trim();
if (!prefix.endsWith("/")) {
prefix += "/";
}
this.destinationPrefixes.add(prefix);
}
}
} }
public Collection<String> getDestinationPrefixes() { public Collection<String> getDestinationPrefixes() {
@ -180,14 +201,17 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati
this.argumentResolvers.addResolver(new MessageBodyMethodArgumentResolver(this.messageConverter)); this.argumentResolvers.addResolver(new MessageBodyMethodArgumentResolver(this.messageConverter));
// Annotation-based return value types // Annotation-based return value types
this.returnValueHandlers.addHandler(new ReplyToMethodReturnValueHandler(this.brokerTemplate)); this.returnValueHandlers.addHandler(new ReplyToMethodReturnValueHandler(this.brokerTemplate, true));
this.returnValueHandlers.addHandler(new SubscriptionMethodReturnValueHandler(this.webSocketResponseTemplate)); this.returnValueHandlers.addHandler(new SubscriptionMethodReturnValueHandler(this.webSocketResponseTemplate));
// custom return value types // custom return value types
this.returnValueHandlers.addHandlers(this.customReturnValueHandlers); this.returnValueHandlers.addHandlers(this.customReturnValueHandlers);
// catch-all
this.returnValueHandlers.addHandler(new ReplyToMethodReturnValueHandler(this.brokerTemplate, false));
} }
protected void initHandlerMethods() { protected final void initHandlerMethods() {
String[] beanNames = this.applicationContext.getBeanNamesForType(Object.class); String[] beanNames = this.applicationContext.getBeanNamesForType(Object.class);
for (String beanName : beanNames) { for (String beanName : beanNames) {
if (isHandler(this.applicationContext.getType(beanName))){ if (isHandler(this.applicationContext.getType(beanName))){
@ -200,26 +224,20 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati
return (AnnotationUtils.findAnnotation(beanType, Controller.class) != null); return (AnnotationUtils.findAnnotation(beanType, Controller.class) != null);
} }
protected void detectHandlerMethods(Object handler) { protected final void detectHandlerMethods(Object handler) {
Class<?> handlerType = (handler instanceof String) ? Class<?> handlerType = (handler instanceof String) ?
this.applicationContext.getType((String) handler) : handler.getClass(); this.applicationContext.getType((String) handler) : handler.getClass();
final Class<?> userType = ClassUtils.getUserClass(handlerType); handlerType = ClassUtils.getUserClass(handlerType);
initHandlerMethods(handler, userType, MessageMapping.class, initHandlerMethods(handler, handlerType, MessageMapping.class, this.messageMethods);
new MessageMappingInfoCreator(), this.messageMethods); initHandlerMethods(handler, handlerType, SubscribeEvent.class, this.subscribeMethods);
initHandlerMethods(handler, handlerType, UnsubscribeEvent.class, this.unsubscribeMethods);
initHandlerMethods(handler, userType, SubscribeEvent.class,
new SubscribeMappingInfoCreator(), this.subscribeMethods);
initHandlerMethods(handler, userType, UnsubscribeEvent.class,
new UnsubscribeMappingInfoCreator(), this.unsubscribeMethods);
} }
private <A extends Annotation> void initHandlerMethods(Object handler, Class<?> handlerType, private <A extends Annotation> void initHandlerMethods(Object handler, Class<?> handlerType,
final Class<A> annotationType, MappingInfoCreator<A> mappingInfoCreator, final Class<A> annotationType, Map<MappingInfo, HandlerMethod> handlerMethods) {
Map<MappingInfo, HandlerMethod> handlerMethods) {
Set<Method> methods = HandlerMethodSelector.selectMethods(handlerType, new MethodFilter() { Set<Method> methods = HandlerMethodSelector.selectMethods(handlerType, new MethodFilter() {
@Override @Override
@ -230,12 +248,25 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati
for (Method method : methods) { for (Method method : methods) {
A annotation = AnnotationUtils.findAnnotation(method, annotationType); A annotation = AnnotationUtils.findAnnotation(method, annotationType);
HandlerMethod hm = createHandlerMethod(handler, method); String[] destinations = (String[]) AnnotationUtils.getValue(annotation);
handlerMethods.put(mappingInfoCreator.create(annotation), hm); MappingInfo mapping = new MappingInfo(destinations);
HandlerMethod newHandlerMethod = createHandlerMethod(handler, method);
HandlerMethod oldHandlerMethod = handlerMethods.get(mapping);
if (oldHandlerMethod != null && !oldHandlerMethod.equals(newHandlerMethod)) {
throw new IllegalStateException("Ambiguous mapping found. Cannot map '" + newHandlerMethod.getBean()
+ "' bean method \n" + newHandlerMethod + "\nto " + mapping + ": There is already '"
+ oldHandlerMethod.getBean() + "' bean method\n" + oldHandlerMethod + " mapped.");
}
handlerMethods.put(mapping, newHandlerMethod);
if (logger.isInfoEnabled()) {
logger.info("Mapped \"@" + annotationType.getSimpleName()
+ " " + mapping + "\" onto " + newHandlerMethod);
}
} }
} }
protected HandlerMethod createHandlerMethod(Object handler, Method method) { private HandlerMethod createHandlerMethod(Object handler, Method method) {
HandlerMethod handlerMethod; HandlerMethod handlerMethod;
if (handler instanceof String) { if (handler instanceof String) {
String beanName = (String) handler; String beanName = (String) handler;
@ -264,22 +295,27 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati
} }
} }
private void handleMessageInternal(final Message<?> message, Map<MappingInfo, HandlerMethod> handlerMethods) { private void handleMessageInternal(Message<?> message, Map<MappingInfo, HandlerMethod> handlerMethods) {
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
String destination = headers.getDestination();
if (!checkDestinationPrefix(destination)) {
return;
}
HandlerMethod match = getHandlerMethod(destination, handlerMethods);
if (match == null) {
return;
}
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Processing message: " + message); logger.trace("Message " + message);
}
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
String lookupPath = getLookupPath(headers.getDestination());
if (lookupPath == null) {
if (logger.isTraceEnabled()) {
logger.trace("Ignoring message with destination " + headers.getDestination());
}
return;
}
HandlerMethod match = getHandlerMethod(lookupPath, handlerMethods);
if (match == null) {
if (logger.isTraceEnabled()) {
logger.trace("No matching method, lookup path " + lookupPath);
}
return;
} }
HandlerMethod handlerMethod = match.createWithResolvedBean(); HandlerMethod handlerMethod = match.createWithResolvedBean();
@ -288,6 +324,9 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati
invocableHandlerMethod.setMessageMethodArgumentResolvers(this.argumentResolvers); invocableHandlerMethod.setMessageMethodArgumentResolvers(this.argumentResolvers);
try { try {
headers.setDestination(lookupPath);
message = MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build();
Object returnValue = invocableHandlerMethod.invoke(message); Object returnValue = invocableHandlerMethod.invoke(message);
MethodParameter returnType = handlerMethod.getReturnType(); MethodParameter returnType = handlerMethod.getReturnType();
@ -305,16 +344,18 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati
} }
} }
private boolean checkDestinationPrefix(String destination) { private String getLookupPath(String destination) {
if ((destination == null) || CollectionUtils.isEmpty(this.destinationPrefixes)) { if (destination != null) {
return true; if (CollectionUtils.isEmpty(this.destinationPrefixes)) {
} return destination;
for (String prefix : this.destinationPrefixes) { }
if (destination.startsWith(prefix)) { for (String prefix : this.destinationPrefixes) {
return true; if (destination.startsWith(prefix)) {
return destination.substring(prefix.length() - 1);
}
} }
} }
return false; return null;
} }
private void invokeExceptionHandler(Message<?> message, HandlerMethod handlerMethod, Exception ex) { private void invokeExceptionHandler(Message<?> message, HandlerMethod handlerMethod, Exception ex) {
@ -365,49 +406,38 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati
private static class MappingInfo { private static class MappingInfo {
private final List<String> destinations; private final String[] destinations;
public MappingInfo(List<String> destinations) { public MappingInfo(String[] destinations) {
Assert.notNull(destinations, "No destinations");
this.destinations = destinations; this.destinations = destinations;
} }
public List<String> getDestinations() { public String[] getDestinations() {
return this.destinations; return this.destinations;
} }
@Override
public int hashCode() {
return Arrays.hashCode(this.destinations);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o != null && getClass().equals(o.getClass())) {
MappingInfo other = (MappingInfo) o;
return Arrays.equals(destinations, other.getDestinations());
}
return false;
}
@Override @Override
public String toString() { public String toString() {
return "MappingInfo [destinations=" + this.destinations + "]"; return "[destinations=" + Arrays.toString(this.destinations) + "]";
}
}
private interface MappingInfoCreator<A extends Annotation> {
MappingInfo create(A annotation);
}
private static class MessageMappingInfoCreator implements MappingInfoCreator<MessageMapping> {
@Override
public MappingInfo create(MessageMapping annotation) {
return new MappingInfo(Arrays.asList(annotation.value()));
}
}
private static class SubscribeMappingInfoCreator implements MappingInfoCreator<SubscribeEvent> {
@Override
public MappingInfo create(SubscribeEvent annotation) {
return new MappingInfo(Arrays.asList(annotation.value()));
}
}
private static class UnsubscribeMappingInfoCreator implements MappingInfoCreator<UnsubscribeEvent> {
@Override
public MappingInfo create(UnsubscribeEvent annotation) {
return new MappingInfo(Arrays.asList(annotation.value()));
} }
} }

View File

@ -80,34 +80,27 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
String destination = headers.getDestination(); String destination = headers.getDestination();
if (!checkDestinationPrefix(destination)) { if (!checkDestinationPrefix(destination)) {
if (logger.isTraceEnabled()) {
logger.trace("Ingoring message with destination " + destination);
}
return; return;
} }
if (SimpMessageType.SUBSCRIBE.equals(messageType)) { if (SimpMessageType.SUBSCRIBE.equals(messageType)) {
preProcessMessage(message);
this.subscriptionRegistry.registerSubscription(message); this.subscriptionRegistry.registerSubscription(message);
} }
else if (SimpMessageType.UNSUBSCRIBE.equals(messageType)) { else if (SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
preProcessMessage(message);
this.subscriptionRegistry.unregisterSubscription(message); this.subscriptionRegistry.unregisterSubscription(message);
} }
else if (SimpMessageType.MESSAGE.equals(messageType)) { else if (SimpMessageType.MESSAGE.equals(messageType)) {
preProcessMessage(message);
sendMessageToSubscribers(headers.getDestination(), message); sendMessageToSubscribers(headers.getDestination(), message);
} }
else if (SimpMessageType.DISCONNECT.equals(messageType)) { else if (SimpMessageType.DISCONNECT.equals(messageType)) {
preProcessMessage(message);
String sessionId = SimpMessageHeaderAccessor.wrap(message).getSessionId(); String sessionId = SimpMessageHeaderAccessor.wrap(message).getSessionId();
this.subscriptionRegistry.unregisterAllSubscriptions(sessionId); this.subscriptionRegistry.unregisterAllSubscriptions(sessionId);
} }
} }
private void preProcessMessage(Message<?> message) {
if (logger.isTraceEnabled()) {
logger.trace("Processing " + message);
}
}
protected void sendMessageToSubscribers(String destination, Message<?> message) { protected void sendMessageToSubscribers(String destination, Message<?> message) {
MultiValueMap<String,String> subscriptions = this.subscriptionRegistry.findSubscriptions(message); MultiValueMap<String,String> subscriptions = this.subscriptionRegistry.findSubscriptions(message);
for (String sessionId : subscriptions.keySet()) { for (String sessionId : subscriptions.keySet()) {

View File

@ -103,7 +103,7 @@ public class StompProtocolHandler implements SubProtocolHandler {
// http://stomp.github.io/stomp-specification-1.2.html#Size_Limits // http://stomp.github.io/stomp-specification-1.2.html#Size_Limits
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Processing STOMP message: " + message); logger.trace("Message " + message);
} }
try { try {

View File

@ -82,7 +82,10 @@ public class GenericMessage<T> implements Message<T>, Serializable {
} }
public String toString() { public String toString() {
return "[Payload=" + this.payload + "][Headers=" + this.headers + "]"; StringBuilder sb = new StringBuilder("[Headers=" + this.headers + "]");
sb.append("[Payload ").append(this.payload.getClass().getSimpleName());
sb.append(" content=").append(this.payload).append("]");
return sb.toString();
} }
public int hashCode() { public int hashCode() {

View File

@ -103,7 +103,7 @@ public abstract class AbstractMessageChannel implements MessageChannel, BeanName
Assert.notNull(message, "Message must not be null"); Assert.notNull(message, "Message must not be null");
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("[" + this.beanName + "] sending message " + message); logger.trace("[" + this.beanName + "] send message " + message);
} }
message = this.interceptorChain.preSend(message, this); message = this.interceptorChain.preSend(message, this);

View File

@ -18,6 +18,7 @@ package org.springframework.messaging.support.channel;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -57,21 +58,30 @@ class ChannelInterceptorChain {
public Message<?> preSend(Message<?> message, MessageChannel channel) { public Message<?> preSend(Message<?> message, MessageChannel channel) {
UUID originalId = message.getHeaders().getId();
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("preSend on channel '" + channel + "', message: " + message); logger.trace("preSend message id " + originalId);
} }
for (ChannelInterceptor interceptor : this.interceptors) { for (ChannelInterceptor interceptor : this.interceptors) {
message = interceptor.preSend(message, channel); message = interceptor.preSend(message, channel);
if (message == null) { if (message == null) {
if (logger.isTraceEnabled()) {
logger.trace("preSend returned null (precluding the send)");
}
return null; return null;
} }
} }
if (logger.isTraceEnabled()) {
if (!message.getHeaders().getId().equals(originalId)) {
logger.trace("preSend returned modified message " + message);
}
}
return message; return message;
} }
public void postSend(Message<?> message, MessageChannel channel, boolean sent) { public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("postSend (sent=" + sent + ") on channel '" + channel + "', message: " + message); logger.trace("postSend (sent=" + sent + ") message id " + message.getHeaders().getId());
} }
for (ChannelInterceptor interceptor : this.interceptors) { for (ChannelInterceptor interceptor : this.interceptors) {
interceptor.postSend(message, channel, sent); interceptor.postSend(message, channel, sent);

View File

@ -64,6 +64,9 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
@Override @Override
public boolean sendInternal(final Message<?> message, long timeout) { public boolean sendInternal(final Message<?> message, long timeout) {
logger.trace("subscribers " + this.handlers);
for (final MessageHandler handler : this.handlers) { for (final MessageHandler handler : this.handlers) {
if (this.executor == null) { if (this.executor == null) {
handler.handleMessage(message); handler.handleMessage(message);

View File

@ -55,6 +55,8 @@ public class ReplyToMethodReturnValueHandlerTests {
private ReplyToMethodReturnValueHandler handler; private ReplyToMethodReturnValueHandler handler;
private ReplyToMethodReturnValueHandler handlerAnnotationNotRequired;
@Mock private MessageChannel messageChannel; @Mock private MessageChannel messageChannel;
@Captor ArgumentCaptor<Message<?>> messageCaptor; @Captor ArgumentCaptor<Message<?>> messageCaptor;
@ -80,7 +82,8 @@ public class ReplyToMethodReturnValueHandlerTests {
SimpMessagingTemplate messagingTemplate = new SimpMessagingTemplate(this.messageChannel); SimpMessagingTemplate messagingTemplate = new SimpMessagingTemplate(this.messageChannel);
messagingTemplate.setConverter(this.messageConverter); messagingTemplate.setConverter(this.messageConverter);
this.handler = new ReplyToMethodReturnValueHandler(messagingTemplate); this.handler = new ReplyToMethodReturnValueHandler(messagingTemplate, true);
this.handlerAnnotationNotRequired = new ReplyToMethodReturnValueHandler(messagingTemplate, false);
Method method = this.getClass().getDeclaredMethod("handleAndReplyTo"); Method method = this.getClass().getDeclaredMethod("handleAndReplyTo");
this.replyToReturnType = new MethodParameter(method, -1); this.replyToReturnType = new MethodParameter(method, -1);
@ -98,6 +101,7 @@ public class ReplyToMethodReturnValueHandlerTests {
assertTrue(this.handler.supportsReturnType(this.replyToReturnType)); assertTrue(this.handler.supportsReturnType(this.replyToReturnType));
assertTrue(this.handler.supportsReturnType(this.replyToUserReturnType)); assertTrue(this.handler.supportsReturnType(this.replyToUserReturnType));
assertFalse(this.handler.supportsReturnType(this.missingReplyToReturnType)); assertFalse(this.handler.supportsReturnType(this.missingReplyToReturnType));
assertTrue(this.handlerAnnotationNotRequired.supportsReturnType(this.missingReplyToReturnType));
} }
@Test @Test

View File

@ -83,12 +83,35 @@ public class AnnotationMethodIntegrationTests extends AbstractWebSocketIntegrati
public void simpleController() throws Exception { public void simpleController() throws Exception {
TextMessage message = create(StompCommand.SEND).headers("destination:/app/simple").build(); TextMessage message = create(StompCommand.SEND).headers("destination:/app/simple").build();
WebSocketSession session = doHandshake(new TestClientWebSocketHandler(message, 0), "/ws"); WebSocketSession session = doHandshake(new TestClientWebSocketHandler(0, message), "/ws");
SimpleController controller = this.wac.getBean(SimpleController.class); SimpleController controller = this.wac.getBean(SimpleController.class);
assertTrue(controller.latch.await(2, TimeUnit.SECONDS)); try {
assertTrue(controller.latch.await(2, TimeUnit.SECONDS));
}
finally {
session.close();
}
}
session.close(); @Test
public void incrementController() throws Exception {
TextMessage message1 = create(StompCommand.SUBSCRIBE).headers(
"id:subs1", "destination:/topic/increment").body("5").build();
TextMessage message2 = create(StompCommand.SEND).headers(
"destination:/app/topic/increment").body("5").build();
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, message1, message2);
WebSocketSession session = doHandshake(clientHandler, "/ws");
try {
assertTrue(clientHandler.latch.await(2, TimeUnit.SECONDS));
}
finally {
session.close();
}
} }
@ -97,15 +120,25 @@ public class AnnotationMethodIntegrationTests extends AbstractWebSocketIntegrati
private CountDownLatch latch = new CountDownLatch(1); private CountDownLatch latch = new CountDownLatch(1);
@MessageMapping(value="/app/simple") @MessageMapping(value="/simple")
public void handle() { public void handle() {
this.latch.countDown(); this.latch.countDown();
} }
} }
@IntegrationTestController
static class IncrementController {
@MessageMapping(value="/topic/increment")
public int handle(int i) {
return i + 1;
}
}
private static class TestClientWebSocketHandler extends TextWebSocketHandlerAdapter { private static class TestClientWebSocketHandler extends TextWebSocketHandlerAdapter {
private final TextMessage messageToSend; private final TextMessage[] messagesToSend;
private final int expected; private final int expected;
@ -114,15 +147,17 @@ public class AnnotationMethodIntegrationTests extends AbstractWebSocketIntegrati
private final CountDownLatch latch; private final CountDownLatch latch;
public TestClientWebSocketHandler(TextMessage messageToSend, int expectedNumberOfMessages) { public TestClientWebSocketHandler(int expectedNumberOfMessages, TextMessage... messagesToSend) {
this.messageToSend = messageToSend; this.messagesToSend = messagesToSend;
this.expected = expectedNumberOfMessages; this.expected = expectedNumberOfMessages;
this.latch = new CountDownLatch(this.expected); this.latch = new CountDownLatch(this.expected);
} }
@Override @Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception { public void afterConnectionEstablished(WebSocketSession session) throws Exception {
session.sendMessage(this.messageToSend); for (TextMessage message : this.messagesToSend) {
session.sendMessage(message);
}
} }
@Override @Override
@ -134,6 +169,7 @@ public class AnnotationMethodIntegrationTests extends AbstractWebSocketIntegrati
@Configuration @Configuration
@ComponentScan(basePackageClasses=AnnotationMethodIntegrationTests.class, @ComponentScan(basePackageClasses=AnnotationMethodIntegrationTests.class,
useDefaultFilters=false,
includeFilters=@ComponentScan.Filter(IntegrationTestController.class)) includeFilters=@ComponentScan.Filter(IntegrationTestController.class))
static class TestMessageBrokerConfigurer implements WebSocketMessageBrokerConfigurer { static class TestMessageBrokerConfigurer implements WebSocketMessageBrokerConfigurer {
@ -147,7 +183,7 @@ public class AnnotationMethodIntegrationTests extends AbstractWebSocketIntegrati
@Override @Override
public void configureMessageBroker(MessageBrokerConfigurer configurer) { public void configureMessageBroker(MessageBrokerConfigurer configurer) {
configurer.setAnnotationMethodDestinationPrefixes("/app/"); configurer.setAnnotationMethodDestinationPrefixes("/app");
configurer.enableSimpleBroker("/topic", "/queue"); configurer.enableSimpleBroker("/topic", "/queue");
} }
} }

View File

@ -0,0 +1,61 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.handler;
import org.junit.Test;
import org.mockito.Mockito;
import org.springframework.context.support.StaticApplicationContext;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;
/**
* Test fixture for {@link AnnotationMethodMessageHandler}.
* @author Rossen Stoyanchev
*/
public class AnnotationMethodMessageHandlerTests {
@Test(expected=IllegalStateException.class)
public void duplicateMappings() {
StaticApplicationContext cxt = new StaticApplicationContext();
cxt.registerSingleton("d", DuplicateMappingController.class);
cxt.refresh();
MessageChannel channel = Mockito.mock(MessageChannel.class);
SimpMessageSendingOperations brokerTemplate = new SimpMessagingTemplate(channel);
AnnotationMethodMessageHandler mh = new AnnotationMethodMessageHandler(brokerTemplate, channel);
mh.setApplicationContext(cxt);
mh.afterPropertiesSet();
}
@Controller
static class DuplicateMappingController {
@MessageMapping(value="/duplicate")
public void handle1() { }
@MessageMapping(value="/duplicate")
public void handle2() { }
}
}

View File

@ -12,7 +12,7 @@
</appender> </appender>
<logger name="org.springframework.messaging"> <logger name="org.springframework.messaging">
<level value="info" /> <level value="trace" />
</logger> </logger>
<logger name="org.apache.activemq"> <logger name="org.apache.activemq">

View File

@ -50,7 +50,7 @@ public class LoggingWebSocketHandlerDecorator extends WebSocketHandlerDecorator
@Override @Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Received " + message + ", " + session); logger.debug(message + ", " + session);
} }
super.handleMessage(session, message); super.handleMessage(session, message);
} }

View File

@ -26,7 +26,6 @@ import org.apache.coyote.http11.Http11NioProtocol;
import org.apache.tomcat.util.descriptor.web.ApplicationListener; import org.apache.tomcat.util.descriptor.web.ApplicationListener;
import org.apache.tomcat.websocket.server.WsListener; import org.apache.tomcat.websocket.server.WsListener;
import org.springframework.core.NestedRuntimeException; import org.springframework.core.NestedRuntimeException;
import org.springframework.util.Assert;
import org.springframework.util.SocketUtils; import org.springframework.util.SocketUtils;
import org.springframework.web.context.WebApplicationContext; import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.servlet.DispatcherServlet; import org.springframework.web.servlet.DispatcherServlet;
@ -94,9 +93,10 @@ public class TomcatWebSocketTestServer implements WebSocketTestServer {
@Override @Override
public void undeployConfig() { public void undeployConfig() {
Assert.notNull(this.context, "deployConfig/undeployConfig must be invoked in pairs"); if (this.context != null) {
this.context.removeServletMapping("/"); this.context.removeServletMapping("/");
this.tomcatServer.getHost().removeChild(this.context); this.tomcatServer.getHost().removeChild(this.context);
}
} }
@Override @Override