From 90c4712d0682dc3efea93e97f978b5f000bbf54f Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 19 Jul 2013 13:46:27 -0400 Subject: [PATCH] Revise UserSessionResolver to UserQueueSuffixResolver The resolver for /user/{username} prefixed destinations is now more explicitly designed to store queue suffixes rather than session id's, which is what we happen to use as queue suffixes. This allows something other than the sessionId to be used without having to change many places. It also enables applications to construct destinations with user-specific queue suffixes without making assumptions about what's used for queue suffixes. For example a controller may construct a map with subscription destinations and send that down to the client. --- ...va => MutableUserQueueSuffixResolver.java} | 6 +- .../SimpleUserQueueSuffixResolver.java | 75 +++++++++++++++++++ .../handler/SimpleUserSessionResolver.java | 65 ---------------- .../UserDestinationMessageHandler.java | 41 +++++----- ...lver.java => UserQueueSuffixResolver.java} | 20 +++-- .../simp/stomp/StompWebSocketHandler.java | 61 +++++++-------- ...> SimpleUserQueueSuffixResolverTests.java} | 36 ++++----- 7 files changed, 166 insertions(+), 138 deletions(-) rename spring-messaging/src/main/java/org/springframework/messaging/simp/handler/{MutableUserSessionResolver.java => MutableUserQueueSuffixResolver.java} (77%) create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolver.java delete mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserSessionResolver.java rename spring-messaging/src/main/java/org/springframework/messaging/simp/handler/{UserSessionResolver.java => UserQueueSuffixResolver.java} (57%) rename spring-messaging/src/test/java/org/springframework/messaging/simp/handler/{SimpleUserSessionResolverTests.java => SimpleUserQueueSuffixResolverTests.java} (54%) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserSessionResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserQueueSuffixResolver.java similarity index 77% rename from spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserSessionResolver.java rename to spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserQueueSuffixResolver.java index 8ef3397d439..1fe326958d0 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserSessionResolver.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserQueueSuffixResolver.java @@ -21,10 +21,10 @@ package org.springframework.messaging.simp.handler; * @author Rossen Stoyanchev * @since 4.0 */ -public interface MutableUserSessionResolver extends UserSessionResolver { +public interface MutableUserQueueSuffixResolver extends UserQueueSuffixResolver { - void addUserSessionId(String user, String sessionId); + void addQueueSuffix(String user, String sessionId, String suffix); - void removeUserSessionId(String user, String sessionId); + void removeQueueSuffix(String user, String sessionId); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolver.java new file mode 100644 index 00000000000..6b3d018b59f --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolver.java @@ -0,0 +1,75 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.handler; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + + +/** + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class SimpleUserQueueSuffixResolver implements MutableUserQueueSuffixResolver { + + // userId -> [sessionId -> queueSuffix] + private final ConcurrentMap> cache = new ConcurrentHashMap>(); + + + @Override + public void addQueueSuffix(String user, String sessionId, String suffix) { + Map suffixes = this.cache.get(user); + if (suffixes == null) { + suffixes = new ConcurrentHashMap(); + Map prevSuffixes = this.cache.putIfAbsent(user, suffixes); + if (prevSuffixes != null) { + suffixes = prevSuffixes; + } + } + suffixes.put(sessionId, suffix); + } + + @Override + public void removeQueueSuffix(String user, String sessionId) { + Map suffixes = this.cache.get(user); + if (suffixes != null) { + if (suffixes.remove(sessionId) != null) { + this.cache.remove(user, Collections.emptyMap()); + } + } + } + + @Override + public Set getUserQueueSuffixes(String user) { + Map suffixes = this.cache.get(user); + return (suffixes != null) ? new HashSet(suffixes.values()) : Collections.emptySet(); + } + + @Override + public String getUserQueueSuffix(String user, String sessionId) { + Map suffixes = this.cache.get(user); + if (suffixes != null) { + return suffixes.get(sessionId); + } + return null; + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserSessionResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserSessionResolver.java deleted file mode 100644 index cd897f30245..00000000000 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserSessionResolver.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging.simp.handler; - -import java.util.Collections; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArraySet; - - -/** - * @author Rossen Stoyanchev - * @since 4.0 - */ -public class SimpleUserSessionResolver implements MutableUserSessionResolver { - - // userId -> sessionId's - private final ConcurrentMap> userSessionIds = new ConcurrentHashMap>(); - - - @Override - public void addUserSessionId(String user, String sessionId) { - Set sessionIds = this.userSessionIds.get(user); - if (sessionIds == null) { - sessionIds = new CopyOnWriteArraySet(); - Set value = this.userSessionIds.putIfAbsent(user, sessionIds); - if (value != null) { - sessionIds = value; - } - } - sessionIds.add(sessionId); - } - - @Override - public void removeUserSessionId(String user, String sessionId) { - Set sessionIds = this.userSessionIds.get(user); - if (sessionIds != null) { - if (sessionIds.remove(sessionId)) { - this.userSessionIds.remove(user, Collections.emptySet()); - } - } - } - - @Override - public Set resolveUserSessionIds(String user) { - Set sessionIds = this.userSessionIds.get(user); - return (sessionIds != null) ? sessionIds : Collections.emptySet(); - } - -} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java index dbb4b9bdf1e..91843e92874 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java @@ -30,11 +30,14 @@ import org.springframework.util.StringUtils; /** + * Supports destinations prefixed with "/user/{username}", transforms the + * destination to a unique queue to which the user is subscribed, and then sends + * the message for further processing. * - * Supports destinations prefixed with "/user/{username}" and resolves them into a - * destination to which the user is currently subscribed by appending the user session id. - * For example a destination such as "/user/john/queue/trade-confirmation" would resolve - * to "/queue/trade-confirmation/i9oqdfzo" if "i9oqdfzo" is the user's session id. + *

The target destination has the prefix removed and a unique queue suffix, + * resolved via {@link #setUserQueueSuffixResolver(UserQueueSuffixResolver)}, appended. + * For example a destination such as "/user/john/queue/trade-confirmation" could + * be transformed to "/queue/trade-confirmation/i9oqdfzo". * * @author Rossen Stoyanchev * @since 4.0 @@ -47,11 +50,22 @@ public class UserDestinationMessageHandler implements MessageHandler { private String prefix = "/user/"; - private UserSessionResolver userSessionResolver = new SimpleUserSessionResolver(); + private UserQueueSuffixResolver userQueueSuffixResolver = new SimpleUserQueueSuffixResolver(); - public UserDestinationMessageHandler(MessageSendingOperations messagingTemplate) { + /** + * + * @param messagingTemplate + * @param resolver the resolver to use to find queue suffixes for a user + */ + public UserDestinationMessageHandler(MessageSendingOperations messagingTemplate, + UserQueueSuffixResolver userQueueSuffixResolver) { + + Assert.notNull(messagingTemplate, "messagingTemplate is required"); + Assert.notNull(userQueueSuffixResolver, "userQueueSuffixResolver is required"); + this.messagingTemplate = messagingTemplate; + this.userQueueSuffixResolver = userQueueSuffixResolver; } /** @@ -71,17 +85,10 @@ public class UserDestinationMessageHandler implements MessageHandler { } /** - * @param userSessionResolver the userSessionResolver to set + * @return the resolver for queue suffixes for a user */ - public void setUserSessionResolver(UserSessionResolver userSessionResolver) { - this.userSessionResolver = userSessionResolver; - } - - /** - * @return the userSessionResolver - */ - public UserSessionResolver getUserSessionResolver() { - return this.userSessionResolver; + public UserQueueSuffixResolver getUserQueueSuffixResolver() { + return this.userQueueSuffixResolver; } /** @@ -116,7 +123,7 @@ public class UserDestinationMessageHandler implements MessageHandler { return; } - for (String sessionId : this.userSessionResolver.resolveUserSessionIds(user)) { + for (String sessionId : this.userQueueSuffixResolver.getUserQueueSuffixes(user)) { String targetDestination = destinationParser.getTargetDestination(sessionId); headers.setDestination(targetDestination); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserQueueSuffixResolver.java similarity index 57% rename from spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionResolver.java rename to spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserQueueSuffixResolver.java index 18b06045007..0acf82c253f 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionResolver.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserQueueSuffixResolver.java @@ -20,19 +20,29 @@ import java.util.Set; /** - * A strategy for resolving a user name to one or more session id's. + * A strategy for resolving unique queue suffixes for a connected user. + * There can be only one suffix per user per session. * * @author Rossen Stoyanchev * @since 4.0 */ -public interface UserSessionResolver { +public interface UserQueueSuffixResolver { /** - * Retrieve the sessionId(s) associated with the given user. + * Retrieve the suffixes for all sessions associated with this user. * * @param user the user name - * @return a Set with zero, one, or more, current session id's. + * @return a Set with zero, one, or more, queue suffixes */ - Set resolveUserSessionIds(String user); + Set getUserQueueSuffixes(String user); + + /** + * Retrieve the queue suffix associated with the given user session. + * + * @param user the user name + * @param sessionId the session id + * @return a queue suffix or {@code null} + */ + String getUserQueueSuffix(String user, String sessionId); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompWebSocketHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompWebSocketHandler.java index 6e4c1a096da..260de8687f7 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompWebSocketHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompWebSocketHandler.java @@ -17,6 +17,7 @@ package org.springframework.messaging.simp.stomp; import java.io.IOException; import java.nio.charset.Charset; +import java.security.Principal; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -27,8 +28,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.simp.SimpMessageType; -import org.springframework.messaging.simp.handler.MutableUserSessionResolver; -import org.springframework.messaging.simp.handler.UserDestinationMessageHandler; +import org.springframework.messaging.simp.handler.MutableUserQueueSuffixResolver; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; @@ -51,17 +51,19 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement public static final String CONNECTED_USER_HEADER = "user-name"; /** - * A suffix unique to the current session that a client can append to a destination. - * @see UserDestinationMessageHandler + * A suffix unique to the current session that a client can use to append to + * a destination to make it unique. + * + * @see {@link org.springframework.messaging.simp.handler.UserDestinationMessageHandler} */ public static final String QUEUE_SUFFIX_HEADER = "queue-suffix"; private static Log logger = LogFactory.getLog(StompWebSocketHandler.class); - private MessageChannel clientInputChannel; + private MessageChannel dispatchChannel; - private MutableUserSessionResolver userSessionStore; + private MutableUserQueueSuffixResolver queueSuffixResolver; private final StompMessageConverter stompMessageConverter = new StompMessageConverter(); @@ -69,29 +71,27 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement /** - * @param clientInputChannel the channel to which incoming STOMP/WebSocket messages should - * be sent to + * @param dispatchChannel the channel to send client STOMP/WebSocket messages to */ - public StompWebSocketHandler(MessageChannel clientInputChannel) { - Assert.notNull(clientInputChannel, "clientInputChannel is required"); - this.clientInputChannel = clientInputChannel; + public StompWebSocketHandler(MessageChannel dispatchChannel) { + Assert.notNull(dispatchChannel, "dispatchChannel is required"); + this.dispatchChannel = dispatchChannel; } /** - * Configure a store for saving user session information. - * @param userSessionStore the userSessionStore to use to store user session id's - * @see UserDestinationMessageHandler + * Configure a resolver to use to maintain queue suffixes for user + * @see {@link org.springframework.messaging.simp.handler.UserDestinationMessageHandler} */ - public void setUserSessionResolver(MutableUserSessionResolver userSessionStore) { - this.userSessionStore = userSessionStore; + public void setUserQueueSuffixResolver(MutableUserQueueSuffixResolver resolver) { + this.queueSuffixResolver = resolver; } /** - * @return the userSessionResolver + * @return the resolver for queue suffixes for a user */ - public MutableUserSessionResolver getUserSessionResolver() { - return this.userSessionStore; + public MutableUserQueueSuffixResolver getUserQueueSuffixResolver() { + return this.queueSuffixResolver; } public StompMessageConverter getStompMessageConverter() { @@ -101,12 +101,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { - Assert.notNull(this.clientInputChannel, "No output channel for STOMP messages."); this.sessions.put(session.getId(), session); - - if ((this.userSessionStore != null) && (session.getPrincipal() != null)) { - this.userSessionStore.addUserSessionId(session.getPrincipal().getName(), session.getId()); - } } /** @@ -135,7 +130,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement handleConnect(session, message); } - this.clientInputChannel.send(message); + this.dispatchChannel.send(message); } catch (Throwable t) { @@ -172,9 +167,15 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement } connectedHeaders.setHeartbeat(0,0); // TODO - if (session.getPrincipal() != null) { - connectedHeaders.setNativeHeader(CONNECTED_USER_HEADER, session.getPrincipal().getName()); + Principal principal = session.getPrincipal(); + if (principal != null) { + connectedHeaders.setNativeHeader(CONNECTED_USER_HEADER, principal.getName()); connectedHeaders.setNativeHeader(QUEUE_SUFFIX_HEADER, session.getId()); + + if (this.queueSuffixResolver != null) { + String suffix = session.getId(); + this.queueSuffixResolver.addQueueSuffix(principal.getName(), session.getId(), suffix); + } } // TODO: security @@ -204,14 +205,14 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement String sessionId = session.getId(); this.sessions.remove(sessionId); - if ((this.userSessionStore != null) && (session.getPrincipal() != null)) { - this.userSessionStore.removeUserSessionId(session.getPrincipal().getName(), sessionId); + if ((this.queueSuffixResolver != null) && (session.getPrincipal() != null)) { + this.queueSuffixResolver.removeQueueSuffix(session.getPrincipal().getName(), sessionId); } StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); headers.setSessionId(sessionId); Message message = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build(); - this.clientInputChannel.send(message); + this.dispatchChannel.send(message); } /** diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleUserSessionResolverTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolverTests.java similarity index 54% rename from spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleUserSessionResolverTests.java rename to spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolverTests.java index 51a9bc9ab06..8a2386640c7 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleUserSessionResolverTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolverTests.java @@ -27,12 +27,12 @@ import static org.junit.Assert.*; /** - * Test fixture for {@link SimpleUserSessionResolver} + * Test fixture for {@link SimpleUserQueueSuffixResolver} * * @author Rossen Stoyanchev * @since 4.0 */ -public class SimpleUserSessionResolverTests { +public class SimpleUserQueueSuffixResolverTests { private static final String user = "joe"; private static final List sessionIds = Arrays.asList("sess01", "sess02", "sess03"); @@ -41,42 +41,42 @@ public class SimpleUserSessionResolverTests { @Test public void addOneSessionId() { - SimpleUserSessionResolver resolver = new SimpleUserSessionResolver(); - resolver.addUserSessionId(user, sessionIds.get(0)); + SimpleUserQueueSuffixResolver resolver = new SimpleUserQueueSuffixResolver(); + resolver.addQueueSuffix(user, sessionIds.get(0), sessionIds.get(0)); - assertEquals(Collections.singleton(sessionIds.get(0)), resolver.resolveUserSessionIds(user)); - assertSame(Collections.emptySet(), resolver.resolveUserSessionIds("jane")); + assertEquals(Collections.singleton(sessionIds.get(0)), resolver.getUserQueueSuffixes(user)); + assertSame(Collections.emptySet(), resolver.getUserQueueSuffixes("jane")); } @Test public void addMultipleSessionIds() { - SimpleUserSessionResolver resolver = new SimpleUserSessionResolver(); + SimpleUserQueueSuffixResolver resolver = new SimpleUserQueueSuffixResolver(); for (String sessionId : sessionIds) { - resolver.addUserSessionId(user, sessionId); + resolver.addQueueSuffix(user, sessionId, sessionId); } - assertEquals(new LinkedHashSet<>(sessionIds), resolver.resolveUserSessionIds(user)); - assertEquals(Collections.emptySet(), resolver.resolveUserSessionIds("jane")); + assertEquals(new LinkedHashSet<>(sessionIds), resolver.getUserQueueSuffixes(user)); + assertEquals(Collections.emptySet(), resolver.getUserQueueSuffixes("jane")); } @Test public void removeSessionIds() { - SimpleUserSessionResolver resolver = new SimpleUserSessionResolver(); + SimpleUserQueueSuffixResolver resolver = new SimpleUserQueueSuffixResolver(); for (String sessionId : sessionIds) { - resolver.addUserSessionId(user, sessionId); + resolver.addQueueSuffix(user, sessionId, sessionId); } - assertEquals(new LinkedHashSet<>(sessionIds), resolver.resolveUserSessionIds(user)); + assertEquals(new LinkedHashSet<>(sessionIds), resolver.getUserQueueSuffixes(user)); - resolver.removeUserSessionId(user, sessionIds.get(1)); - resolver.removeUserSessionId(user, sessionIds.get(2)); - assertEquals(Collections.singleton(sessionIds.get(0)), resolver.resolveUserSessionIds(user)); + resolver.removeQueueSuffix(user, sessionIds.get(1)); + resolver.removeQueueSuffix(user, sessionIds.get(2)); + assertEquals(Collections.singleton(sessionIds.get(0)), resolver.getUserQueueSuffixes(user)); - resolver.removeUserSessionId(user, sessionIds.get(0)); - assertSame(Collections.emptySet(), resolver.resolveUserSessionIds(user)); + resolver.removeQueueSuffix(user, sessionIds.get(0)); + assertSame(Collections.emptySet(), resolver.getUserQueueSuffixes(user)); } }