diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AnnotationSimpMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AnnotationMethodMessageHandler.java similarity index 97% rename from spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AnnotationSimpMessageHandler.java rename to spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AnnotationMethodMessageHandler.java index 51e2c3e72d..fa71006fb8 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AnnotationSimpMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AnnotationMethodMessageHandler.java @@ -63,9 +63,9 @@ import org.springframework.web.method.HandlerMethodSelector; * @author Rossen Stoyanchev * @since 4.0 */ -public class AnnotationSimpMessageHandler implements MessageHandler, ApplicationContextAware, InitializingBean { +public class AnnotationMethodMessageHandler implements MessageHandler, ApplicationContextAware, InitializingBean { - private static final Log logger = LogFactory.getLog(AnnotationSimpMessageHandler.class); + private static final Log logger = LogFactory.getLog(AnnotationMethodMessageHandler.class); private final MessageChannel outboundChannel; @@ -91,7 +91,7 @@ public class AnnotationSimpMessageHandler implements MessageHandler, Application * @param inboundChannel a channel for processing incoming messages from clients * @param outboundChannel a channel for messages going out to clients */ - public AnnotationSimpMessageHandler(MessageChannel outboundChannel) { + public AnnotationMethodMessageHandler(MessageChannel outboundChannel) { Assert.notNull(outboundChannel, "outboundChannel is required"); this.outboundChannel = outboundChannel; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/InMemoryUserSessionResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/InMemoryUserSessionResolver.java new file mode 100644 index 0000000000..c24acea17b --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/InMemoryUserSessionResolver.java @@ -0,0 +1,62 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.handler; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; + + +/** + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class InMemoryUserSessionResolver implements UserSessionResolver, UserSessionStore { + + // userId -> sessionId's + private final Map> userSessionIds = new ConcurrentHashMap>(); + + + @Override + public void storeUserSessionId(String user, String sessionId) { + Set sessionIds = this.userSessionIds.get(user); + if (sessionIds == null) { + sessionIds = new CopyOnWriteArraySet(); + this.userSessionIds.put(user, sessionIds); + } + sessionIds.add(sessionId); + } + + @Override + public void deleteUserSessionId(String user, String sessionId) { + Set sessionIds = this.userSessionIds.get(user); + if (sessionIds != null) { + if (sessionIds.remove(sessionId) && sessionIds.isEmpty()) { + this.userSessionIds.remove(user); + } + } + } + + @Override + public Set resolveUserSessionIds(String user) { + Set sessionIds = this.userSessionIds.get(user); + return (sessionIds != null) ? sessionIds : Collections.emptySet(); + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java new file mode 100644 index 0000000000..2422959847 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java @@ -0,0 +1,187 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.handler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.core.MessageSendingOperations; +import org.springframework.messaging.simp.SimpMessageHeaderAccessor; +import org.springframework.messaging.simp.SimpMessageType; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + + +/** + * + * Supports destinations prefixed with "/user/{username}" and resolves them into a + * destination to which the user is currently subscribed by appending the user session id. + * For example a destination such as "/user/john/queue/trade-confirmation" would resolve + * to "/trade-confirmation/i9oqdfzo" if "i9oqdfzo" is the user's session id. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class UserDestinationMessageHandler implements MessageHandler { + + private static final Log logger = LogFactory.getLog(UserDestinationMessageHandler.class); + + private final MessageSendingOperations messagingTemplate; + + private String prefix = "/user/"; + + private UserSessionResolver userSessionResolver = new InMemoryUserSessionResolver(); + + + public UserDestinationMessageHandler(MessageSendingOperations messagingTemplate) { + this.messagingTemplate = messagingTemplate; + } + + /** + *

The default prefix is "/user". + * @param prefix the prefix to set + */ + public void setPrefix(String prefix) { + Assert.hasText(prefix, "prefix is required"); + this.prefix = prefix.endsWith("/") ? prefix : prefix + "/"; + } + + /** + * @return the prefix + */ + public String getPrefix() { + return this.prefix; + } + + /** + * @param userSessionResolver the userSessionResolver to set + */ + public void setUserSessionResolver(UserSessionResolver userSessionResolver) { + this.userSessionResolver = userSessionResolver; + } + + /** + * @return the userSessionResolver + */ + public UserSessionResolver getUserSessionResolver() { + return this.userSessionResolver; + } + + /** + * @return the messagingTemplate + */ + public MessageSendingOperations getMessagingTemplate() { + return this.messagingTemplate; + } + + @Override + public void handleMessage(Message message) throws MessagingException { + + if (!shouldHandle(message)) { + return; + } + + SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message); + String destination = headers.getDestination(); + + if (logger.isTraceEnabled()) { + logger.trace("Processing message to destination " + destination); + } + + UserDestinationParser destinationParser = new UserDestinationParser(destination); + String user = destinationParser.getUser(); + + if (user == null) { + if (logger.isErrorEnabled()) { + logger.error("Ignoring message, expected destination \"" + this.prefix + + "{userId}/**\": " + destination); + } + return; + } + + for (String sessionId : this.userSessionResolver.resolveUserSessionIds(user)) { + + String targetDestination = destinationParser.getTargetDestination(sessionId); + headers.setDestination(targetDestination); + message = MessageBuilder.fromMessage(message).copyHeaders(headers.toMap()).build(); + + if (logger.isTraceEnabled()) { + logger.trace("Sending message to resolved target destination " + targetDestination); + } + this.messagingTemplate.send(targetDestination, message); + } + } + + protected boolean shouldHandle(Message message) { + + SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message); + SimpMessageType messageType = headers.getMessageType(); + String destination = headers.getDestination(); + + if (!SimpMessageType.MESSAGE.equals(messageType)) { + return false; + } + + if (!StringUtils.hasText(destination)) { + if (logger.isErrorEnabled()) { + logger.error("Ignoring message, no destination: " + headers); + } + return false; + } + else if (!destination.startsWith(this.prefix)) { + return false; + } + + return true; + } + + + private class UserDestinationParser { + + private final String user; + + private final String targetDestination; + + + public UserDestinationParser(String destination) { + + int userStartIndex = prefix.length(); + int userEndIndex = destination.indexOf('/', userStartIndex); + + if (userEndIndex > 0) { + this.user = destination.substring(userStartIndex, userEndIndex); + this.targetDestination = destination.substring(userEndIndex); + } + else { + this.user = null; + this.targetDestination = null; + } + } + + public String getUser() { + return this.user; + } + + public String getTargetDestination(String sessionId) { + return (this.targetDestination != null) ? this.targetDestination + "/" + sessionId : null; + } + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionResolver.java new file mode 100644 index 0000000000..18b0604500 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionResolver.java @@ -0,0 +1,38 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.handler; + +import java.util.Set; + + +/** + * A strategy for resolving a user name to one or more session id's. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public interface UserSessionResolver { + + /** + * Retrieve the sessionId(s) associated with the given user. + * + * @param user the user name + * @return a Set with zero, one, or more, current session id's. + */ + Set resolveUserSessionIds(String user); + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionStore.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionStore.java new file mode 100644 index 0000000000..e2423a110a --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionStore.java @@ -0,0 +1,30 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.handler; + + +/** + * @author Rossen Stoyanchev + * @since 4.0 + */ +public interface UserSessionStore { + + void storeUserSessionId(String user, String sessionId); + + void deleteUserSessionId(String user, String sessionId); + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/package-info.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/package-info.java new file mode 100644 index 0000000000..888d58bde3 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/package-info.java @@ -0,0 +1,4 @@ +/** + * MessageHandler implementation and supporting classes for message processing. + */ +package org.springframework.messaging.simp.handler; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompWebSocketHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompWebSocketHandler.java index 30edde3f04..e868da7f87 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompWebSocketHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompWebSocketHandler.java @@ -27,6 +27,8 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.simp.SimpMessageType; +import org.springframework.messaging.simp.handler.UserDestinationMessageHandler; +import org.springframework.messaging.simp.handler.UserSessionStore; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; @@ -48,6 +50,12 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement */ public static final String CONNECTED_USER_HEADER = "user-name"; + /** + * A suffix unique to the current session that a client can append to a destination. + * @see UserDestinationMessageHandler + */ + public static final String QUEUE_SUFFIX_HEADER = "queue-suffix"; + private static final byte[] EMPTY_PAYLOAD = new byte[0]; @@ -55,6 +63,8 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement private MessageChannel clientInputChannel; + private UserSessionStore userSessionStore; + private final StompMessageConverter stompMessageConverter = new StompMessageConverter(); private final Map sessions = new ConcurrentHashMap(); @@ -70,14 +80,35 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement } + /** + * Configure a store for saving user session information. + * @param userSessionStore the userSessionStore to use to store user session id's + * @see UserDestinationMessageHandler + */ + public void setUserSessionResolver(UserSessionStore userSessionStore) { + this.userSessionStore = userSessionStore; + } + + /** + * @return the userSessionResolver + */ + public UserSessionStore getUserSessionResolver() { + return this.userSessionStore; + } + public StompMessageConverter getStompMessageConverter() { return this.stompMessageConverter; } + @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { Assert.notNull(this.clientInputChannel, "No output channel for STOMP messages."); this.sessions.put(session.getId(), session); + + if ((this.userSessionStore != null) && (session.getPrincipal() != null)) { + this.userSessionStore.storeUserSessionId(session.getPrincipal().getName(), session.getId()); + } } /** @@ -146,6 +177,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement if (session.getPrincipal() != null) { connectedHeaders.setNativeHeader(CONNECTED_USER_HEADER, session.getPrincipal().getName()); + connectedHeaders.setNativeHeader(QUEUE_SUFFIX_HEADER, session.getId()); } // TODO: security @@ -178,6 +210,10 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement String sessionId = session.getId(); this.sessions.remove(sessionId); + if ((this.userSessionStore != null) && (session.getPrincipal() != null)) { + this.userSessionStore.deleteUserSessionId(session.getPrincipal().getName(), sessionId); + } + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); headers.setSessionId(sessionId); Message message = MessageBuilder.withPayload(new byte[0]).copyHeaders(headers.toMap()).build();