Add SimpMessageSendingOperations

This commit is contained in:
Rossen Stoyanchev 2013-07-17 10:09:55 -04:00
parent 078cfb3e78
commit ba7998d03b
8 changed files with 140 additions and 60 deletions

View File

@ -0,0 +1,35 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.core.MessageSendingOperations;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface SimpMessageSendingOperations extends MessageSendingOperations<String> {
<T> void convertAndSendToUser(String user, String destination, T message) throws MessagingException;
<T> void convertAndSendToUser(String user, String destination, T message, MessagePostProcessor postProcessor)
throws MessagingException;
}

View File

@ -20,7 +20,9 @@ import java.util.Arrays;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.core.AbstractMessageSendingTemplate;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
@ -32,19 +34,45 @@ import org.springframework.util.Assert;
* @author Mark Fisher
* @since 4.0
*/
public class SimpMessagingTemplate extends AbstractMessageSendingTemplate<String> {
public class SimpMessagingTemplate extends AbstractMessageSendingTemplate<String>
implements SimpMessageSendingOperations {
private final MessageChannel outputChannel;
private final MessageChannel messageChannel;
private String userDestinationPrefix = "/user/";
private volatile long sendTimeout = -1;
public SimpMessagingTemplate(MessageChannel outputChannel) {
Assert.notNull(outputChannel, "outputChannel is required");
this.outputChannel = outputChannel;
public SimpMessagingTemplate(MessageChannel messageChannel) {
Assert.notNull(messageChannel, "outputChannel is required");
this.messageChannel = messageChannel;
}
/**
* Configure the prefix to use for destinations targeting a specific user.
* <p>The default value is "/user/".
* @see org.springframework.messaging.simp.handler.UserDestinationMessageHandler
*/
public void setUserDestinationPrefix(String prefix) {
this.userDestinationPrefix = prefix;
}
/**
* @return the userDestinationPrefix
*/
public String getUserDestinationPrefix() {
return this.userDestinationPrefix;
}
/**
* @return the messageChannel
*/
public MessageChannel getMessageChannel() {
return this.messageChannel;
}
/**
* Specify the timeout value to use for send operations.
*
@ -54,6 +82,13 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplate<String
this.sendTimeout = sendTimeout;
}
/**
* @return the sendTimeout
*/
public long getSendTimeout() {
return this.sendTimeout;
}
@Override
public <P> void send(Message<P> message) {
@ -64,22 +99,35 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplate<String
@Override
protected void doSend(String destination, Message<?> message) {
Assert.notNull(destination, "destination is required");
message = addDestinationToMessage(message, destination);
message = updateMessageHeaders(message, destination);
long timeout = this.sendTimeout;
boolean sent = (timeout >= 0)
? this.outputChannel.send(message, timeout)
: this.outputChannel.send(message);
? this.messageChannel.send(message, timeout)
: this.messageChannel.send(message);
if (!sent) {
throw new MessageDeliveryException(message,
"failed to send message to destination '" + destination + "' within timeout: " + timeout);
}
}
protected <P> Message<P> addDestinationToMessage(Message<P> message, String destination) {
protected <P> Message<P> updateMessageHeaders(Message<P> message, String destination) {
Assert.notNull(destination, "destination is required");
return MessageBuilder.fromMessage(message)
.setHeader(SimpMessageHeaderAccessor.MESSAGE_TYPE, SimpMessageType.MESSAGE)
.setHeader(SimpMessageHeaderAccessor.DESTINATIONS, Arrays.asList(destination)).build();
}
@Override
public <T> void convertAndSendToUser(String user, String destination, T message) throws MessagingException {
convertAndSendToUser(user, destination, message, null);
}
@Override
public <T> void convertAndSendToUser(String user, String destination, T message,
MessagePostProcessor postProcessor) throws MessagingException {
Assert.notNull(user, "user is required");
convertAndSend(this.userDestinationPrefix + user + destination, message, postProcessor);
}
}

View File

@ -17,18 +17,15 @@
package org.springframework.messaging.simp.annotation.support;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.springframework.core.MethodParameter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.core.MessageSendingOperations;
import org.springframework.messaging.handler.annotation.ReplyTo;
import org.springframework.messaging.handler.method.HandlerMethodReturnValueHandler;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.messaging.simp.annotation.ReplyToUser;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
@ -49,10 +46,10 @@ import org.springframework.util.Assert;
*/
public class ReplyToMethodReturnValueHandler implements HandlerMethodReturnValueHandler {
private final MessageSendingOperations<String> messagingTemplate;
private final SimpMessageSendingOperations messagingTemplate;
public ReplyToMethodReturnValueHandler(MessageSendingOperations<String> messagingTemplate) {
public ReplyToMethodReturnValueHandler(SimpMessageSendingOperations messagingTemplate) {
Assert.notNull(messagingTemplate, "messagingTemplate is required");
this.messagingTemplate = messagingTemplate;
}
@ -72,27 +69,24 @@ public class ReplyToMethodReturnValueHandler implements HandlerMethodReturnValue
return;
}
MessagePostProcessor postProcessor = new SessionHeaderPostProcessor(inputMessage);
ReplyTo replyTo = returnType.getMethodAnnotation(ReplyTo.class);
ReplyToUser replyToUser = returnType.getMethodAnnotation(ReplyToUser.class);
List<String> destinations = new ArrayList<String>();
if (replyTo != null) {
destinations.addAll(Arrays.asList(replyTo.value()));
}
if (replyToUser != null) {
Principal user = getUser(inputMessage);
for (String destination : replyToUser.value()) {
destinations.add("/user/" + user.getName() + destination);
}
}
MessagePostProcessor postProcessor = new SessionIdHeaderPostProcessor(inputMessage);
for (String destination : destinations) {
for (String destination : replyTo.value()) {
this.messagingTemplate.convertAndSend(destination, returnValue, postProcessor);
}
}
ReplyToUser replyToUser = returnType.getMethodAnnotation(ReplyToUser.class);
if (replyToUser != null) {
String user = getUser(inputMessage).getName();
for (String destination : replyToUser.value()) {
this.messagingTemplate.convertAndSendToUser(user, destination, returnValue, postProcessor);
}
}
}
private Principal getUser(Message<?> inputMessage) {
SimpMessageHeaderAccessor inputHeaders = SimpMessageHeaderAccessor.wrap(inputMessage);
Principal user = inputHeaders.getUser();
@ -103,12 +97,12 @@ public class ReplyToMethodReturnValueHandler implements HandlerMethodReturnValue
}
private final class SessionIdHeaderPostProcessor implements MessagePostProcessor {
private final class SessionHeaderPostProcessor implements MessagePostProcessor {
private final Message<?> inputMessage;
public SessionIdHeaderPostProcessor(Message<?> inputMessage) {
public SessionHeaderPostProcessor(Message<?> inputMessage) {
this.inputMessage = inputMessage;
}

View File

@ -74,27 +74,26 @@ public class SubscriptionMethodReturnValueHandler implements HandlerMethodReturn
"No subsriptiondId in input message. Add @ReplyTo or @ReplyToUser to method: "
+ returnType.getMethod());
MessagePostProcessor postProcessor = new InputHeaderCopyingPostProcessor(inputHeaders);
MessagePostProcessor postProcessor = new SubscriptionHeaderPostProcessor(inputHeaders);
this.messagingTemplate.convertAndSend(destination, returnValue, postProcessor);
}
private final class InputHeaderCopyingPostProcessor implements MessagePostProcessor {
private final class SubscriptionHeaderPostProcessor implements MessagePostProcessor {
private final SimpMessageHeaderAccessor inputHeaders;
public InputHeaderCopyingPostProcessor(SimpMessageHeaderAccessor inputHeaders) {
public SubscriptionHeaderPostProcessor(SimpMessageHeaderAccessor inputHeaders) {
this.inputHeaders = inputHeaders;
}
@Override
public Message<?> postProcessMessage(Message<?> message) {
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
return MessageBuilder.fromMessage(message)
.setHeader(SimpMessageHeaderAccessor.SESSION_ID, this.inputHeaders.getSessionId())
.setHeader(SimpMessageHeaderAccessor.SUBSCRIPTION_ID, this.inputHeaders.getSubscriptionId())
.copyHeaders(headers.toMap()).build();
.build();
}
}
}

View File

@ -34,10 +34,10 @@ import org.springframework.context.ApplicationContextAware;
import org.springframework.core.MethodParameter;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.ReplyTo;
import org.springframework.messaging.handler.annotation.support.ExceptionHandlerMethodResolver;
import org.springframework.messaging.handler.annotation.support.MessageBodyMethodArgumentResolver;
import org.springframework.messaging.handler.method.HandlerMethod;
@ -46,8 +46,8 @@ import org.springframework.messaging.handler.method.HandlerMethodReturnValueHand
import org.springframework.messaging.handler.method.HandlerMethodSelector;
import org.springframework.messaging.handler.method.InvocableHandlerMethod;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.annotation.SubscribeEvent;
import org.springframework.messaging.simp.annotation.UnsubscribeEvent;
import org.springframework.messaging.simp.annotation.support.PrincipalMethodArgumentResolver;
@ -68,9 +68,9 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati
private static final Log logger = LogFactory.getLog(AnnotationMethodMessageHandler.class);
private final MessageChannel inboundChannel;
private final SimpMessageSendingOperations inboundMessagingTemplate;
private final MessageChannel outboundChannel;
private final SimpMessageSendingOperations outboundMessagingTemplate;
private MessageConverter<?> messageConverter;
@ -91,14 +91,24 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati
/**
* @param inboundChannel a channel for processing incoming messages from clients
* @param outboundChannel a channel for messages going out to clients
* @param inboundMessagingTemplate a template for sending messages on the channel
* where incoming messages from clients are sent; essentially messages sent
* through this template will be re-processed by the application. One example
* is the use of {@link ReplyTo} annotation on a method to send a broadcast
* message.
* @param outboundMessagingTemplate a template for sending messages on the client used
* to send messages back out to connected clients; such messages must have all
* necessary information to reach the client such as session and subscription
* id's. One example is returning a value from an {@link SubscribeEvent}
* method.
*/
public AnnotationMethodMessageHandler(MessageChannel inboundChannel, MessageChannel outboundChannel) {
Assert.notNull(inboundChannel, "inboundChannel is required");
Assert.notNull(outboundChannel, "outboundChannel is required");
this.inboundChannel = inboundChannel;
this.outboundChannel = outboundChannel;
public AnnotationMethodMessageHandler(SimpMessageSendingOperations inboundMessagingTemplate,
SimpMessageSendingOperations outboundMessagingTemplate) {
Assert.notNull(inboundMessagingTemplate, "inboundMessagingTemplate is required");
Assert.notNull(outboundMessagingTemplate, "outboundMessagingTemplate is required");
this.inboundMessagingTemplate = inboundMessagingTemplate;
this.outboundMessagingTemplate = outboundMessagingTemplate;
}
/**
@ -121,14 +131,8 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati
this.argumentResolvers.addResolver(new PrincipalMethodArgumentResolver());
this.argumentResolvers.addResolver(new MessageBodyMethodArgumentResolver(this.messageConverter));
SimpMessagingTemplate inboundMessagingTemplate = new SimpMessagingTemplate(this.inboundChannel);
inboundMessagingTemplate.setConverter(this.messageConverter);
SimpMessagingTemplate outboundMessagingTemplate = new SimpMessagingTemplate(this.outboundChannel);
outboundMessagingTemplate.setConverter(this.messageConverter);
this.returnValueHandlers.addHandler(new ReplyToMethodReturnValueHandler(inboundMessagingTemplate));
this.returnValueHandlers.addHandler(new SubscriptionMethodReturnValueHandler(outboundMessagingTemplate));
this.returnValueHandlers.addHandler(new ReplyToMethodReturnValueHandler(this.inboundMessagingTemplate));
this.returnValueHandlers.addHandler(new SubscriptionMethodReturnValueHandler(this.outboundMessagingTemplate));
}
protected void initHandlerMethods() {

View File

@ -21,7 +21,7 @@ package org.springframework.messaging.simp.handler;
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface UserSessionStore {
public interface MutableUserSessionResolver extends UserSessionResolver {
void storeUserSessionId(String user, String sessionId);

View File

@ -27,7 +27,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class SimpleUserSessionResolver implements UserSessionResolver, UserSessionStore {
public class SimpleUserSessionResolver implements MutableUserSessionResolver {
// userId -> sessionId's
private final Map<String, Set<String>> userSessionIds = new ConcurrentHashMap<String, Set<String>>();

View File

@ -28,7 +28,7 @@ import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.handler.UserDestinationMessageHandler;
import org.springframework.messaging.simp.handler.UserSessionStore;
import org.springframework.messaging.simp.handler.MutableUserSessionResolver;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
@ -63,7 +63,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
private MessageChannel clientInputChannel;
private UserSessionStore userSessionStore;
private MutableUserSessionResolver userSessionStore;
private final StompMessageConverter stompMessageConverter = new StompMessageConverter();
@ -85,14 +85,14 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
* @param userSessionStore the userSessionStore to use to store user session id's
* @see UserDestinationMessageHandler
*/
public void setUserSessionResolver(UserSessionStore userSessionStore) {
public void setUserSessionResolver(MutableUserSessionResolver userSessionStore) {
this.userSessionStore = userSessionStore;
}
/**
* @return the userSessionResolver
*/
public UserSessionStore getUserSessionResolver() {
public MutableUserSessionResolver getUserSessionResolver() {
return this.userSessionStore;
}