Add support for sending private messages

The new UserDestinationMessageHandler resolves messages with
destinations prefixed with "/user/{username}" and resolves them into a
destination to which the user is currently subscribed by appending the
user session id.

For example a destination such as "/user/john/queue/trade-confirmation"
would resolve "/trade-confirmation/i9oqdfzo" assuming "i9oqdfzo" is the
user's session id.
This commit is contained in:
Rossen Stoyanchev 2013-07-14 21:10:15 -04:00
parent 2a48ad88fb
commit 5d20b75dc2
7 changed files with 360 additions and 3 deletions

View File

@ -63,9 +63,9 @@ import org.springframework.web.method.HandlerMethodSelector;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class AnnotationSimpMessageHandler implements MessageHandler, ApplicationContextAware, InitializingBean {
public class AnnotationMethodMessageHandler implements MessageHandler, ApplicationContextAware, InitializingBean {
private static final Log logger = LogFactory.getLog(AnnotationSimpMessageHandler.class);
private static final Log logger = LogFactory.getLog(AnnotationMethodMessageHandler.class);
private final MessageChannel outboundChannel;
@ -91,7 +91,7 @@ public class AnnotationSimpMessageHandler implements MessageHandler, Application
* @param inboundChannel a channel for processing incoming messages from clients
* @param outboundChannel a channel for messages going out to clients
*/
public AnnotationSimpMessageHandler(MessageChannel outboundChannel) {
public AnnotationMethodMessageHandler(MessageChannel outboundChannel) {
Assert.notNull(outboundChannel, "outboundChannel is required");
this.outboundChannel = outboundChannel;
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.handler;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class InMemoryUserSessionResolver implements UserSessionResolver, UserSessionStore {
// userId -> sessionId's
private final Map<String, Set<String>> userSessionIds = new ConcurrentHashMap<String, Set<String>>();
@Override
public void storeUserSessionId(String user, String sessionId) {
Set<String> sessionIds = this.userSessionIds.get(user);
if (sessionIds == null) {
sessionIds = new CopyOnWriteArraySet<String>();
this.userSessionIds.put(user, sessionIds);
}
sessionIds.add(sessionId);
}
@Override
public void deleteUserSessionId(String user, String sessionId) {
Set<String> sessionIds = this.userSessionIds.get(user);
if (sessionIds != null) {
if (sessionIds.remove(sessionId) && sessionIds.isEmpty()) {
this.userSessionIds.remove(user);
}
}
}
@Override
public Set<String> resolveUserSessionIds(String user) {
Set<String> sessionIds = this.userSessionIds.get(user);
return (sessionIds != null) ? sessionIds : Collections.<String>emptySet();
}
}

View File

@ -0,0 +1,187 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.handler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.core.MessageSendingOperations;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
*
* Supports destinations prefixed with "/user/{username}" and resolves them into a
* destination to which the user is currently subscribed by appending the user session id.
* For example a destination such as "/user/john/queue/trade-confirmation" would resolve
* to "/trade-confirmation/i9oqdfzo" if "i9oqdfzo" is the user's session id.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class UserDestinationMessageHandler implements MessageHandler {
private static final Log logger = LogFactory.getLog(UserDestinationMessageHandler.class);
private final MessageSendingOperations<String> messagingTemplate;
private String prefix = "/user/";
private UserSessionResolver userSessionResolver = new InMemoryUserSessionResolver();
public UserDestinationMessageHandler(MessageSendingOperations<String> messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
/**
* <p>The default prefix is "/user".
* @param prefix the prefix to set
*/
public void setPrefix(String prefix) {
Assert.hasText(prefix, "prefix is required");
this.prefix = prefix.endsWith("/") ? prefix : prefix + "/";
}
/**
* @return the prefix
*/
public String getPrefix() {
return this.prefix;
}
/**
* @param userSessionResolver the userSessionResolver to set
*/
public void setUserSessionResolver(UserSessionResolver userSessionResolver) {
this.userSessionResolver = userSessionResolver;
}
/**
* @return the userSessionResolver
*/
public UserSessionResolver getUserSessionResolver() {
return this.userSessionResolver;
}
/**
* @return the messagingTemplate
*/
public MessageSendingOperations<String> getMessagingTemplate() {
return this.messagingTemplate;
}
@Override
public void handleMessage(Message<?> message) throws MessagingException {
if (!shouldHandle(message)) {
return;
}
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
String destination = headers.getDestination();
if (logger.isTraceEnabled()) {
logger.trace("Processing message to destination " + destination);
}
UserDestinationParser destinationParser = new UserDestinationParser(destination);
String user = destinationParser.getUser();
if (user == null) {
if (logger.isErrorEnabled()) {
logger.error("Ignoring message, expected destination \"" + this.prefix
+ "{userId}/**\": " + destination);
}
return;
}
for (String sessionId : this.userSessionResolver.resolveUserSessionIds(user)) {
String targetDestination = destinationParser.getTargetDestination(sessionId);
headers.setDestination(targetDestination);
message = MessageBuilder.fromMessage(message).copyHeaders(headers.toMap()).build();
if (logger.isTraceEnabled()) {
logger.trace("Sending message to resolved target destination " + targetDestination);
}
this.messagingTemplate.send(targetDestination, message);
}
}
protected boolean shouldHandle(Message<?> message) {
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
SimpMessageType messageType = headers.getMessageType();
String destination = headers.getDestination();
if (!SimpMessageType.MESSAGE.equals(messageType)) {
return false;
}
if (!StringUtils.hasText(destination)) {
if (logger.isErrorEnabled()) {
logger.error("Ignoring message, no destination: " + headers);
}
return false;
}
else if (!destination.startsWith(this.prefix)) {
return false;
}
return true;
}
private class UserDestinationParser {
private final String user;
private final String targetDestination;
public UserDestinationParser(String destination) {
int userStartIndex = prefix.length();
int userEndIndex = destination.indexOf('/', userStartIndex);
if (userEndIndex > 0) {
this.user = destination.substring(userStartIndex, userEndIndex);
this.targetDestination = destination.substring(userEndIndex);
}
else {
this.user = null;
this.targetDestination = null;
}
}
public String getUser() {
return this.user;
}
public String getTargetDestination(String sessionId) {
return (this.targetDestination != null) ? this.targetDestination + "/" + sessionId : null;
}
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.handler;
import java.util.Set;
/**
* A strategy for resolving a user name to one or more session id's.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface UserSessionResolver {
/**
* Retrieve the sessionId(s) associated with the given user.
*
* @param user the user name
* @return a Set with zero, one, or more, current session id's.
*/
Set<String> resolveUserSessionIds(String user);
}

View File

@ -0,0 +1,30 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.handler;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface UserSessionStore {
void storeUserSessionId(String user, String sessionId);
void deleteUserSessionId(String user, String sessionId);
}

View File

@ -0,0 +1,4 @@
/**
* MessageHandler implementation and supporting classes for message processing.
*/
package org.springframework.messaging.simp.handler;

View File

@ -27,6 +27,8 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.handler.UserDestinationMessageHandler;
import org.springframework.messaging.simp.handler.UserSessionStore;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
@ -48,6 +50,12 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
*/
public static final String CONNECTED_USER_HEADER = "user-name";
/**
* A suffix unique to the current session that a client can append to a destination.
* @see UserDestinationMessageHandler
*/
public static final String QUEUE_SUFFIX_HEADER = "queue-suffix";
private static final byte[] EMPTY_PAYLOAD = new byte[0];
@ -55,6 +63,8 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
private MessageChannel clientInputChannel;
private UserSessionStore userSessionStore;
private final StompMessageConverter stompMessageConverter = new StompMessageConverter();
private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<String, WebSocketSession>();
@ -70,14 +80,35 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
}
/**
* Configure a store for saving user session information.
* @param userSessionStore the userSessionStore to use to store user session id's
* @see UserDestinationMessageHandler
*/
public void setUserSessionResolver(UserSessionStore userSessionStore) {
this.userSessionStore = userSessionStore;
}
/**
* @return the userSessionResolver
*/
public UserSessionStore getUserSessionResolver() {
return this.userSessionStore;
}
public StompMessageConverter getStompMessageConverter() {
return this.stompMessageConverter;
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
Assert.notNull(this.clientInputChannel, "No output channel for STOMP messages.");
this.sessions.put(session.getId(), session);
if ((this.userSessionStore != null) && (session.getPrincipal() != null)) {
this.userSessionStore.storeUserSessionId(session.getPrincipal().getName(), session.getId());
}
}
/**
@ -146,6 +177,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
if (session.getPrincipal() != null) {
connectedHeaders.setNativeHeader(CONNECTED_USER_HEADER, session.getPrincipal().getName());
connectedHeaders.setNativeHeader(QUEUE_SUFFIX_HEADER, session.getId());
}
// TODO: security
@ -178,6 +210,10 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
String sessionId = session.getId();
this.sessions.remove(sessionId);
if ((this.userSessionStore != null) && (session.getPrincipal() != null)) {
this.userSessionStore.deleteUserSessionId(session.getPrincipal().getName(), sessionId);
}
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
headers.setSessionId(sessionId);
Message<?> message = MessageBuilder.withPayload(new byte[0]).copyHeaders(headers.toMap()).build();