Polish "user" destination support package
Issue: SPR-11620
This commit is contained in:
parent
3c5a9b4e1d
commit
c384945a17
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2014 the original author or authors.
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -32,21 +32,18 @@ import org.springframework.util.Assert;
|
|||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* A default implementation of {@link UserDestinationResolver} that relies
|
||||
* on the {@link org.springframework.messaging.simp.user.UserSessionRegistry}
|
||||
* provided to the constructor to find the sessionIds associated with a user
|
||||
* and then uses the sessionId to make the target destination unique.
|
||||
* A default implementation of {@code UserDestinationResolver} that relies
|
||||
* on a {@link org.springframework.messaging.simp.user.UserSessionRegistry} to
|
||||
* find active sessions for a user.
|
||||
*
|
||||
* <p>When a user attempts to subscribe to "/user/queue/position-updates", the
|
||||
* "/user" prefix is removed and a unique suffix added, resulting in something
|
||||
* like "/queue/position-updates-useri9oqdfzo" where the suffix is based on the
|
||||
* user's session and ensures it does not collide with any other users attempting
|
||||
* to subscribe to "/user/queue/position-updates".
|
||||
* <p>When a user attempts to subscribe, e.g. to "/user/queue/position-updates",
|
||||
* the "/user" prefix is removed and a unique suffix added based on the session
|
||||
* id, e.g. "/queue/position-updates-useri9oqdfzo" to ensure different users can
|
||||
* subscribe to the same logical destination without colliding.
|
||||
*
|
||||
* <p>When a message is sent to a user with a destination such as
|
||||
* "/user/{username}/queue/position-updates", the "/user/{username}" prefix is
|
||||
* removed and the suffix added, resulting in something like
|
||||
* "/queue/position-updates-useri9oqdfzo".
|
||||
* <p>When sending to a user, e.g. "/user/{username}/queue/position-updates", the
|
||||
* "/user/{username}" prefix is removed and a suffix based on active session id's
|
||||
* is added, e.g. "/queue/position-updates-useri9oqdfzo".
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @author Brian Clozel
|
||||
|
|
@ -57,40 +54,19 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
|
|||
private static final Log logger = LogFactory.getLog(DefaultUserDestinationResolver.class);
|
||||
|
||||
|
||||
private final UserSessionRegistry userSessionRegistry;
|
||||
private final UserSessionRegistry sessionRegistry;
|
||||
|
||||
private String destinationPrefix = "/user/";
|
||||
private String prefix = "/user/";
|
||||
|
||||
|
||||
/**
|
||||
* Create an instance that will access user session id information through
|
||||
* the provided registry.
|
||||
* @param userSessionRegistry the registry, never {@code null}
|
||||
* @param sessionRegistry the registry, never {@code null}
|
||||
*/
|
||||
public DefaultUserDestinationResolver(UserSessionRegistry userSessionRegistry) {
|
||||
Assert.notNull(userSessionRegistry, "'userSessionRegistry' must not be null");
|
||||
this.userSessionRegistry = userSessionRegistry;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* The prefix used to identify user destinations. Any destinations that do not
|
||||
* start with the given prefix are not be resolved.
|
||||
* <p>The default value is "/user/".
|
||||
* @param prefix the prefix to use
|
||||
*/
|
||||
public void setUserDestinationPrefix(String prefix) {
|
||||
Assert.hasText(prefix, "prefix must not be empty");
|
||||
this.destinationPrefix = prefix.endsWith("/") ? prefix : prefix + "/";
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the prefix used to identify user destinations. Any destinations that do not
|
||||
* start with the given prefix are not be resolved.
|
||||
* <p>By default "/user/queue/".
|
||||
*/
|
||||
public String getDestinationPrefix() {
|
||||
return this.destinationPrefix;
|
||||
public DefaultUserDestinationResolver(UserSessionRegistry sessionRegistry) {
|
||||
Assert.notNull(sessionRegistry, "'sessionRegistry' must not be null");
|
||||
this.sessionRegistry = sessionRegistry;
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -98,76 +74,91 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
|
|||
* Return the configured {@link UserSessionRegistry}.
|
||||
*/
|
||||
public UserSessionRegistry getUserSessionRegistry() {
|
||||
return this.userSessionRegistry;
|
||||
return this.sessionRegistry;
|
||||
}
|
||||
|
||||
/**
|
||||
* The prefix used to identify user destinations. Any destinations that do not
|
||||
* start with the given prefix are not be resolved.
|
||||
* <p>The default prefix is "/user/".
|
||||
* @param prefix the prefix to use
|
||||
*/
|
||||
public void setUserDestinationPrefix(String prefix) {
|
||||
Assert.hasText(prefix, "prefix must not be empty");
|
||||
this.prefix = prefix.endsWith("/") ? prefix : prefix + "/";
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the configured prefix for user destinations.
|
||||
*/
|
||||
public String getDestinationPrefix() {
|
||||
return this.prefix;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public UserDestinationResult resolveDestination(Message<?> message) {
|
||||
String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
|
||||
DestinationInfo info = parseUserDestination(message);
|
||||
if (info == null) {
|
||||
String sourceDestination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
|
||||
ParseResult parseResult = parse(message);
|
||||
if (parseResult == null) {
|
||||
return null;
|
||||
}
|
||||
Set<String> resolved = new HashSet<String>();
|
||||
for (String sessionId : info.getSessionIds()) {
|
||||
String targetDestination = getTargetDestination(
|
||||
destination, info.getDestinationWithoutPrefix(), sessionId, info.getUser());
|
||||
String user = parseResult.getUser();
|
||||
Set<String> targetSet = new HashSet<String>();
|
||||
for (String sessionId : parseResult.getSessionIds()) {
|
||||
String actualDestination = parseResult.getActualDestination();
|
||||
String targetDestination = getTargetDestination(sourceDestination, actualDestination, sessionId, user);
|
||||
if (targetDestination != null) {
|
||||
resolved.add(targetDestination);
|
||||
targetSet.add(targetDestination);
|
||||
}
|
||||
}
|
||||
return new UserDestinationResult(destination, resolved, info.getSubscribeDestination(), info.getUser());
|
||||
String subscribeDestination = parseResult.getSubscribeDestination();
|
||||
return new UserDestinationResult(sourceDestination, targetSet, subscribeDestination, user);
|
||||
}
|
||||
|
||||
private DestinationInfo parseUserDestination(Message<?> message) {
|
||||
private ParseResult parse(Message<?> message) {
|
||||
MessageHeaders headers = message.getHeaders();
|
||||
SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
|
||||
String destination = SimpMessageHeaderAccessor.getDestination(headers);
|
||||
Principal principal = SimpMessageHeaderAccessor.getUser(headers);
|
||||
String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
|
||||
|
||||
String destinationWithoutPrefix;
|
||||
String subscribeDestination;
|
||||
String user;
|
||||
Set<String> sessionIds;
|
||||
|
||||
if (destination == null || !checkDestination(destination, this.destinationPrefix)) {
|
||||
if (destination == null || !checkDestination(destination, this.prefix)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
|
||||
Principal principal = SimpMessageHeaderAccessor.getUser(headers);
|
||||
String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
|
||||
if (SimpMessageType.SUBSCRIBE.equals(messageType) || SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
|
||||
if (sessionId == null) {
|
||||
logger.error("No session id. Ignoring " + message);
|
||||
return null;
|
||||
}
|
||||
destinationWithoutPrefix = destination.substring(this.destinationPrefix.length()-1);
|
||||
subscribeDestination = destination;
|
||||
user = (principal != null ? principal.getName() : null);
|
||||
sessionIds = Collections.singleton(sessionId);
|
||||
int prefixEnd = this.prefix.length() - 1;
|
||||
String actualDestination = destination.substring(prefixEnd);
|
||||
String user = (principal != null ? principal.getName() : null);
|
||||
return new ParseResult(actualDestination, destination, Collections.singleton(sessionId), user);
|
||||
}
|
||||
else if (SimpMessageType.MESSAGE.equals(messageType)) {
|
||||
int startIndex = this.destinationPrefix.length();
|
||||
int endIndex = destination.indexOf('/', startIndex);
|
||||
Assert.isTrue(endIndex > 0, "Expected destination pattern \"/user/{userId}/**\"");
|
||||
destinationWithoutPrefix = destination.substring(endIndex);
|
||||
subscribeDestination = this.destinationPrefix.substring(0, startIndex-1) + destinationWithoutPrefix;
|
||||
user = destination.substring(startIndex, endIndex);
|
||||
int prefixEnd = this.prefix.length();
|
||||
int userEnd = destination.indexOf('/', prefixEnd);
|
||||
Assert.isTrue(userEnd > 0, "Expected destination pattern \"/user/{userId}/**\"");
|
||||
String actualDestination = destination.substring(userEnd);
|
||||
String subscribeDestination = this.prefix.substring(0, prefixEnd - 1) + actualDestination;
|
||||
String user = destination.substring(prefixEnd, userEnd);
|
||||
user = StringUtils.replace(user, "%2F", "/");
|
||||
Set<String> sessionIds;
|
||||
if (user.equals(sessionId)) {
|
||||
user = null;
|
||||
sessionIds = Collections.singleton(sessionId);
|
||||
}
|
||||
else if (this.userSessionRegistry.getSessionIds(user).contains(sessionId)) {
|
||||
else if (this.sessionRegistry.getSessionIds(user).contains(sessionId)) {
|
||||
sessionIds = Collections.singleton(sessionId);
|
||||
}
|
||||
else {
|
||||
sessionIds = this.userSessionRegistry.getSessionIds(user);
|
||||
sessionIds = this.sessionRegistry.getSessionIds(user);
|
||||
}
|
||||
return new ParseResult(actualDestination, subscribeDestination, sessionIds, user);
|
||||
}
|
||||
else {
|
||||
return null;
|
||||
}
|
||||
return new DestinationInfo(destinationWithoutPrefix, subscribeDestination, user, sessionIds);
|
||||
}
|
||||
|
||||
protected boolean checkDestination(String destination, String requiredPrefix) {
|
||||
|
|
@ -175,66 +166,62 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
|
|||
}
|
||||
|
||||
/**
|
||||
* This methods determines the translated destination to use based on the source
|
||||
* destination, the source destination with the user prefix removed, a session
|
||||
* id, and the user for the session (if known).
|
||||
* @param sourceDestination the source destination of the input message
|
||||
* @param sourceDestinationWithoutPrefix the source destination without the user prefix
|
||||
* @param sessionId the id of the session for the target message
|
||||
* @param user the user associated with the session, or {@code null}
|
||||
* This method determines how to translate the source "user" destination to an
|
||||
* actual target destination for the given active user session.
|
||||
* @param sourceDestination the source destination from the input message.
|
||||
* @param actualDestination a subset of the destination without any user prefix.
|
||||
* @param sessionId the id of an active user session, never {@code null}.
|
||||
* @param user the target user, possibly {@code null}, e.g if not authenticated.
|
||||
* @return a target destination, or {@code null} if none
|
||||
*/
|
||||
protected String getTargetDestination(String sourceDestination,
|
||||
String sourceDestinationWithoutPrefix, String sessionId, String user) {
|
||||
protected String getTargetDestination(String sourceDestination, String actualDestination,
|
||||
String sessionId, String user) {
|
||||
|
||||
return sourceDestinationWithoutPrefix + "-user" + sessionId;
|
||||
return actualDestination + "-user" + sessionId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DefaultUserDestinationResolver[prefix=" + this.destinationPrefix + "]";
|
||||
return "DefaultUserDestinationResolver[prefix=" + this.prefix + "]";
|
||||
}
|
||||
|
||||
|
||||
private static class DestinationInfo {
|
||||
/**
|
||||
* A temporary placeholder for a parsed source "user" destination.
|
||||
*/
|
||||
private static class ParseResult {
|
||||
|
||||
private final String destinationWithoutPrefix;
|
||||
private final String actualDestination;
|
||||
|
||||
private final String subscribeDestination;
|
||||
|
||||
private final String user;
|
||||
|
||||
private final Set<String> sessionIds;
|
||||
|
||||
public DestinationInfo(String destinationWithoutPrefix, String subscribeDestination, String user,
|
||||
Set<String> sessionIds) {
|
||||
private final String user;
|
||||
|
||||
this.user = user;
|
||||
this.destinationWithoutPrefix = destinationWithoutPrefix;
|
||||
this.subscribeDestination = subscribeDestination;
|
||||
|
||||
public ParseResult(String actualDest, String subscribeDest, Set<String> sessionIds, String user) {
|
||||
this.actualDestination = actualDest;
|
||||
this.subscribeDestination = subscribeDest;
|
||||
this.sessionIds = sessionIds;
|
||||
this.user = user;
|
||||
}
|
||||
|
||||
public String getDestinationWithoutPrefix() {
|
||||
return this.destinationWithoutPrefix;
|
||||
|
||||
public String getActualDestination() {
|
||||
return this.actualDestination;
|
||||
}
|
||||
|
||||
public String getSubscribeDestination() {
|
||||
return this.subscribeDestination;
|
||||
}
|
||||
|
||||
public String getUser() {
|
||||
return this.user;
|
||||
}
|
||||
|
||||
public Set<String> getSessionIds() {
|
||||
return this.sessionIds;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DestinationInfo[destination=" + this.destinationWithoutPrefix + ", subscribeDestination=" +
|
||||
this.subscribeDestination + ", user=" + this.user + ", sessionIds=" + this.sessionIds + "]";
|
||||
public String getUser() {
|
||||
return this.user;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -41,7 +41,7 @@ public class DefaultUserSessionRegistry implements UserSessionRegistry {
|
|||
@Override
|
||||
public Set<String> getSessionIds(String user) {
|
||||
Set<String> set = this.userSessionIds.get(user);
|
||||
return (set != null) ? set : Collections.<String>emptySet();
|
||||
return (set != null ? set : Collections.<String>emptySet());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -1,19 +1,33 @@
|
|||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.messaging.simp.user;
|
||||
|
||||
/**
|
||||
* An interface to be implemented in addition to {@link java.security.Principal}
|
||||
* when {@link java.security.Principal#getName()} is not globally unique enough
|
||||
* for use in user destinations. For more on user destination see
|
||||
* {@link org.springframework.messaging.simp.user.UserDestinationResolver}.
|
||||
* A {@link java.security.Principal} can also implement this contract when
|
||||
* {@link java.security.Principal#getName() getName()} isn't globally unique
|
||||
* and therefore not suited for use with "user" destinations.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 4.0.1
|
||||
* @see org.springframework.messaging.simp.user.UserDestinationResolver
|
||||
*/
|
||||
public interface DestinationUserNameProvider {
|
||||
|
||||
|
||||
/**
|
||||
* Return the (globally unique) user name to use with user destinations.
|
||||
* Return a globally unique user name for use with "user" destinations.
|
||||
*/
|
||||
String getDestinationUserName();
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2014 the original author or authors.
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -35,11 +35,11 @@ import org.springframework.messaging.support.MessageHeaderInitializer;
|
|||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* Provides support for messages sent to "user" destinations, translating the
|
||||
* destination to one or more user-specific destination(s) and then sending message(s)
|
||||
* with the updated target destination using the provided messaging template.
|
||||
* <p>
|
||||
* See {@link UserDestinationResolver} for more details and examples.
|
||||
* {@code MessageHandler} with support for "user" destinations.
|
||||
*
|
||||
* <p>Listens for messages with "user" destinations, translates their destination
|
||||
* to actual target destinations unique to the active session(s) of a user, and
|
||||
* then sends the resolved messages to the broker channel to be delivered.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 4.0
|
||||
|
|
@ -53,9 +53,9 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
|
|||
|
||||
private final SubscribableChannel brokerChannel;
|
||||
|
||||
private final MessageSendingOperations<String> brokerMessagingTemplate;
|
||||
private final MessageSendingOperations<String> messagingTemplate;
|
||||
|
||||
private final UserDestinationResolver userDestinationResolver;
|
||||
private final UserDestinationResolver destinationResolver;
|
||||
|
||||
private MessageHeaderInitializer headerInitializer;
|
||||
|
||||
|
|
@ -65,54 +65,53 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
|
|||
|
||||
|
||||
/**
|
||||
* Create an instance of the handler with the given messaging template and a
|
||||
* user destination resolver.
|
||||
* @param clientInChannel the channel for receiving messages from clients (e.g. WebSocket clients)
|
||||
* @param brokerChannel the channel for sending messages with translated user destinations
|
||||
* @param userDestinationResolver the resolver to use to find queue suffixes for a user
|
||||
* Create an instance with the given client and broker channels subscribing
|
||||
* to handle messages from each and then sending any resolved messages to the
|
||||
* broker channel.
|
||||
* @param clientInboundChannel messages received from clients.
|
||||
* @param brokerChannel messages sent to the broker.
|
||||
* @param resolver the resolver for "user" destinations.
|
||||
*/
|
||||
public UserDestinationMessageHandler(SubscribableChannel clientInChannel,
|
||||
SubscribableChannel brokerChannel, UserDestinationResolver userDestinationResolver) {
|
||||
public UserDestinationMessageHandler(SubscribableChannel clientInboundChannel,
|
||||
SubscribableChannel brokerChannel, UserDestinationResolver resolver) {
|
||||
|
||||
Assert.notNull(clientInChannel, "'clientInChannel' must not be null");
|
||||
Assert.notNull(clientInboundChannel, "'clientInChannel' must not be null");
|
||||
Assert.notNull(brokerChannel, "'brokerChannel' must not be null");
|
||||
Assert.notNull(userDestinationResolver, "DestinationResolver must not be null");
|
||||
Assert.notNull(resolver, "resolver must not be null");
|
||||
|
||||
this.clientInboundChannel = clientInChannel;
|
||||
this.clientInboundChannel = clientInboundChannel;
|
||||
this.brokerChannel = brokerChannel;
|
||||
this.brokerMessagingTemplate = new SimpMessagingTemplate(brokerChannel);
|
||||
this.userDestinationResolver = userDestinationResolver;
|
||||
this.messagingTemplate = new SimpMessagingTemplate(brokerChannel);
|
||||
this.destinationResolver = resolver;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Return the configured messaging template for sending messages with
|
||||
* translated destinations.
|
||||
*/
|
||||
public MessageSendingOperations<String> getBrokerMessagingTemplate() {
|
||||
return this.brokerMessagingTemplate;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the configured {@link UserDestinationResolver}.
|
||||
*/
|
||||
public UserDestinationResolver getUserDestinationResolver() {
|
||||
return this.userDestinationResolver;
|
||||
return this.destinationResolver;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure a {@link MessageHeaderInitializer} to pass on to
|
||||
* {@link org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler}s
|
||||
* that send messages from controller return values.
|
||||
*
|
||||
* <p>By default this property is not set.
|
||||
* Return the messaging template used to send resolved messages to the
|
||||
* broker channel.
|
||||
*/
|
||||
public MessageSendingOperations<String> getBrokerMessagingTemplate() {
|
||||
return this.messagingTemplate;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure a custom {@link MessageHeaderInitializer} to initialize the
|
||||
* headers of resolved target messages.
|
||||
* <p>By default this is not set.
|
||||
*/
|
||||
public void setHeaderInitializer(MessageHeaderInitializer headerInitializer) {
|
||||
this.headerInitializer = headerInitializer;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the configured header initializer.
|
||||
* Return the configured header initializer.
|
||||
*/
|
||||
public MessageHeaderInitializer getHeaderInitializer() {
|
||||
return this.headerInitializer;
|
||||
|
|
@ -165,7 +164,7 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
|
|||
|
||||
@Override
|
||||
public void handleMessage(Message<?> message) throws MessagingException {
|
||||
UserDestinationResult result = this.userDestinationResolver.resolveDestination(message);
|
||||
UserDestinationResult result = this.destinationResolver.resolveDestination(message);
|
||||
if (result == null) {
|
||||
return;
|
||||
}
|
||||
|
|
@ -177,17 +176,17 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
|
|||
return;
|
||||
}
|
||||
if (SimpMessageType.MESSAGE.equals(SimpMessageHeaderAccessor.getMessageType(message.getHeaders()))) {
|
||||
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.wrap(message);
|
||||
initHeaders(headerAccessor);
|
||||
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(message);
|
||||
initHeaders(accessor);
|
||||
String header = SimpMessageHeaderAccessor.ORIGINAL_DESTINATION;
|
||||
headerAccessor.setNativeHeader(header, result.getSubscribeDestination());
|
||||
message = MessageBuilder.createMessage(message.getPayload(), headerAccessor.getMessageHeaders());
|
||||
accessor.setNativeHeader(header, result.getSubscribeDestination());
|
||||
message = MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
|
||||
}
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Translated " + result.getSourceDestination() + " -> " + destinations);
|
||||
}
|
||||
for (String destination : destinations) {
|
||||
this.brokerMessagingTemplate.send(destination, message);
|
||||
this.messagingTemplate.send(destination, message);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -199,7 +198,7 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "UserDestinationMessageHandler[" + this.userDestinationResolver + "]";
|
||||
return "UserDestinationMessageHandler[" + this.destinationResolver + "]";
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2014 the original author or authors.
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -19,17 +19,14 @@ package org.springframework.messaging.simp.user;
|
|||
import org.springframework.messaging.Message;
|
||||
|
||||
/**
|
||||
* A strategy for resolving a "user" destination and translating it to one or more
|
||||
* actual destinations unique to the user's active session(s).
|
||||
* <p>
|
||||
* For messages sent to a user, the destination must contain the name of the target
|
||||
* user, The name, extracted from the destination, is used to look up the active
|
||||
* user session(s), and then translate the destination accordingly.
|
||||
* <p>
|
||||
* For SUBSCRIBE and UNSUBSCRIBE messages, the user is the user associated with
|
||||
* the message. In other words the destination does not contain the user name.
|
||||
* <p>
|
||||
* See the documentation on implementations for specific examples.
|
||||
* A strategy for resolving a "user" destination by translating it to one or more
|
||||
* actual destinations one per active user session. When sending a message to a
|
||||
* user destination, the destination must contain the user name so it may be
|
||||
* extracted and used to look up the user sessions. When subscribing to a user
|
||||
* destination, the destination does not have to contain the user's own name.
|
||||
* We simply use the current session.
|
||||
*
|
||||
* <p>See implementation classes and the documentation for example destinations.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 4.0
|
||||
|
|
@ -40,18 +37,11 @@ import org.springframework.messaging.Message;
|
|||
public interface UserDestinationResolver {
|
||||
|
||||
/**
|
||||
* Resolve the destination of the message to a set of actual target destinations.
|
||||
* <p>
|
||||
* If the message is SUBSCRIBE/UNSUBSCRIBE, the returned set will contain a
|
||||
* single translated target destination.
|
||||
* <p>
|
||||
* If the message represents data being sent to a user, the returned set may
|
||||
* contain multiple target destinations, one for each active user session.
|
||||
*
|
||||
* @param message the message with a user destination to be resolved
|
||||
*
|
||||
* @return the result of the resolution, or {@code null} if the resolution
|
||||
* fails (e.g. not a user destination, or no user info available, etc)
|
||||
* Resolve the given message with a user destination to one or more messages
|
||||
* with actual destinations, one for each active user session.
|
||||
* @param message the message to try to resolve
|
||||
* @return 0 or more target messages (one for each active session), or
|
||||
* {@code null} if the source message does not contain a user destination.
|
||||
*/
|
||||
UserDestinationResult resolveDestination(Message<?> message);
|
||||
|
||||
|
|
|
|||
|
|
@ -21,12 +21,12 @@ import java.util.Set;
|
|||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* A simple container for the result of parsing and translating a "user" destination
|
||||
* in some source message into a set of actual target destinations by calling
|
||||
* {@link org.springframework.messaging.simp.user.UserDestinationResolver}.
|
||||
* Contains the result from parsing a "user" destination from a source message
|
||||
* and translating it to target destinations (one per active user session).
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 4.0.2
|
||||
* @see org.springframework.messaging.simp.user.UserDestinationResolver
|
||||
*/
|
||||
public class UserDestinationResult {
|
||||
|
||||
|
|
@ -54,39 +54,40 @@ public class UserDestinationResult {
|
|||
|
||||
|
||||
/**
|
||||
* The "user" destination as found in the headers of the source message.
|
||||
*
|
||||
* @return a destination, never {@code null}
|
||||
* The "user" destination from the source message. This may look like
|
||||
* "/user/queue/position-updates" when subscribing or
|
||||
* "/user/{username}/queue/position-updates" when sending a message.
|
||||
* @return the "user" destination, never {@code null}.
|
||||
*/
|
||||
public String getSourceDestination() {
|
||||
return this.sourceDestination;
|
||||
}
|
||||
|
||||
/**
|
||||
* The result of parsing the source destination and translating it into a set
|
||||
* of actual target destinations to use.
|
||||
*
|
||||
* @return a set of destination values, possibly an empty set
|
||||
* The target destinations that the source destination was translated to,
|
||||
* one per active user session, e.g. "/queue/position-updates-useri9oqdfzo".
|
||||
* @return the target destinations, never {@code null} but possibly an empty
|
||||
* set if there are no active sessions for the user.
|
||||
*/
|
||||
public Set<String> getTargetDestinations() {
|
||||
return this.targetDestinations;
|
||||
}
|
||||
|
||||
/**
|
||||
* The canonical form of the user destination as would be required to subscribe.
|
||||
* This may be useful to ensure that messages received by clients contain the
|
||||
* original destination they used to subscribe.
|
||||
*
|
||||
* @return a destination, never {@code null}
|
||||
* The user destination in the form expected when a client subscribes, e.g.
|
||||
* "/user/queue/position-updates".
|
||||
* @return the subscribe form of the "user" destination, never {@code null}.
|
||||
*/
|
||||
public String getSubscribeDestination() {
|
||||
return this.subscribeDestination;
|
||||
}
|
||||
|
||||
/**
|
||||
* The user associated with the user destination.
|
||||
*
|
||||
* @return the user name, never {@code null}
|
||||
* The user for this user destination.
|
||||
* @return the user name or {@code null} if we have a session id only such as
|
||||
* when the user is not authenticated; in such cases it is possible to use
|
||||
* sessionId in place of a user name thus removing the need for a user-to-session
|
||||
* lookup via {@link org.springframework.messaging.simp.user.UserSessionRegistry}.
|
||||
*/
|
||||
public String getUser() {
|
||||
return this.user;
|
||||
|
|
|
|||
|
|
@ -1,12 +1,26 @@
|
|||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.messaging.simp.user;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* A registry for looking up active session id's by user.
|
||||
*
|
||||
* <p>Used in support of resolving unique session-specific user destinations.
|
||||
* See {@link DefaultUserDestinationResolver} for more details.
|
||||
* A registry for looking up active user sessions. For use when resolving user
|
||||
* destinations.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 4.0
|
||||
|
|
@ -14,24 +28,22 @@ import java.util.Set;
|
|||
*/
|
||||
public interface UserSessionRegistry {
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Return the active session id's for the given user.
|
||||
* Return the active session id's for the user.
|
||||
* @param user the user
|
||||
* @return a set with 0 or more session id's
|
||||
* @return a set with 0 or more session id's, never {@code null}.
|
||||
*/
|
||||
Set<String> getSessionIds(String user);
|
||||
|
||||
/**
|
||||
* Register an active session id for the given user.
|
||||
* Register an active session id for a user.
|
||||
* @param user the user
|
||||
* @param sessionId the session id
|
||||
*/
|
||||
void registerSessionId(String user, String sessionId);
|
||||
|
||||
/**
|
||||
* Unregister the session id for a user.
|
||||
* Unregister an active session id for a user.
|
||||
* @param user the user
|
||||
* @param sessionId the session id
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2014 the original author or authors.
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -16,6 +16,8 @@
|
|||
|
||||
package org.springframework.messaging.simp.user;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
|
@ -26,8 +28,6 @@ import org.springframework.messaging.simp.TestPrincipal;
|
|||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* Unit tests for
|
||||
* {@link org.springframework.messaging.simp.user.DefaultUserDestinationResolver}.
|
||||
|
|
@ -57,7 +57,7 @@ public class DefaultUserDestinationResolverTests {
|
|||
@Test
|
||||
public void handleSubscribe() {
|
||||
String sourceDestination = "/user/queue/foo";
|
||||
Message<?> message = createMessage(SimpMessageType.SUBSCRIBE, this.user, SESSION_ID, sourceDestination);
|
||||
Message<?> message = createWith(SimpMessageType.SUBSCRIBE, this.user, SESSION_ID, sourceDestination);
|
||||
UserDestinationResult actual = this.resolver.resolveDestination(message);
|
||||
|
||||
assertEquals(sourceDestination, actual.getSourceDestination());
|
||||
|
|
@ -75,7 +75,7 @@ public class DefaultUserDestinationResolverTests {
|
|||
this.registry.registerSessionId("joe", "456");
|
||||
this.registry.registerSessionId("joe", "789");
|
||||
|
||||
Message<?> message = createMessage(SimpMessageType.SUBSCRIBE, this.user, SESSION_ID, "/user/queue/foo");
|
||||
Message<?> message = createWith(SimpMessageType.SUBSCRIBE, this.user, SESSION_ID, "/user/queue/foo");
|
||||
UserDestinationResult actual = this.resolver.resolveDestination(message);
|
||||
|
||||
assertEquals(1, actual.getTargetDestinations().size());
|
||||
|
|
@ -85,7 +85,7 @@ public class DefaultUserDestinationResolverTests {
|
|||
@Test
|
||||
public void handleSubscribeNoUser() {
|
||||
String sourceDestination = "/user/queue/foo";
|
||||
Message<?> message = createMessage(SimpMessageType.SUBSCRIBE, null, SESSION_ID, sourceDestination);
|
||||
Message<?> message = createWith(SimpMessageType.SUBSCRIBE, null, SESSION_ID, sourceDestination);
|
||||
UserDestinationResult actual = this.resolver.resolveDestination(message);
|
||||
|
||||
assertEquals(sourceDestination, actual.getSourceDestination());
|
||||
|
|
@ -97,7 +97,7 @@ public class DefaultUserDestinationResolverTests {
|
|||
|
||||
@Test
|
||||
public void handleUnsubscribe() {
|
||||
Message<?> message = createMessage(SimpMessageType.UNSUBSCRIBE, this.user, SESSION_ID, "/user/queue/foo");
|
||||
Message<?> message = createWith(SimpMessageType.UNSUBSCRIBE, this.user, SESSION_ID, "/user/queue/foo");
|
||||
UserDestinationResult actual = this.resolver.resolveDestination(message);
|
||||
|
||||
assertEquals(1, actual.getTargetDestinations().size());
|
||||
|
|
@ -107,7 +107,7 @@ public class DefaultUserDestinationResolverTests {
|
|||
@Test
|
||||
public void handleMessage() {
|
||||
String sourceDestination = "/user/joe/queue/foo";
|
||||
Message<?> message = createMessage(SimpMessageType.MESSAGE, this.user, SESSION_ID, sourceDestination);
|
||||
Message<?> message = createWith(SimpMessageType.MESSAGE, this.user, SESSION_ID, sourceDestination);
|
||||
UserDestinationResult actual = this.resolver.resolveDestination(message);
|
||||
|
||||
assertEquals(sourceDestination, actual.getSourceDestination());
|
||||
|
|
@ -126,7 +126,7 @@ public class DefaultUserDestinationResolverTests {
|
|||
String sourceDestination = "/user/"+OTHER_USER_NAME+"/queue/foo";
|
||||
TestPrincipal otherUser = new TestPrincipal(OTHER_USER_NAME);
|
||||
this.registry.registerSessionId(otherUser.getName(), OTHER_SESSION_ID);
|
||||
Message<?> message = createMessage(SimpMessageType.MESSAGE, this.user, SESSION_ID, sourceDestination);
|
||||
Message<?> message = createWith(SimpMessageType.MESSAGE, this.user, SESSION_ID, sourceDestination);
|
||||
UserDestinationResult actual = this.resolver.resolveDestination(message);
|
||||
|
||||
assertEquals(sourceDestination, actual.getSourceDestination());
|
||||
|
|
@ -142,7 +142,7 @@ public class DefaultUserDestinationResolverTests {
|
|||
String userName = "http://joe.openid.example.org/";
|
||||
this.registry.registerSessionId(userName, "openid123");
|
||||
String destination = "/user/" + StringUtils.replace(userName, "/", "%2F") + "/queue/foo";
|
||||
Message<?> message = createMessage(SimpMessageType.MESSAGE, this.user, null, destination);
|
||||
Message<?> message = createWith(SimpMessageType.MESSAGE, this.user, null, destination);
|
||||
UserDestinationResult actual = this.resolver.resolveDestination(message);
|
||||
|
||||
assertEquals(1, actual.getTargetDestinations().size());
|
||||
|
|
@ -152,7 +152,7 @@ public class DefaultUserDestinationResolverTests {
|
|||
@Test
|
||||
public void handleMessageWithNoUser() {
|
||||
String sourceDestination = "/user/" + SESSION_ID + "/queue/foo";
|
||||
Message<?> message = createMessage(SimpMessageType.MESSAGE, null, SESSION_ID, sourceDestination);
|
||||
Message<?> message = createWith(SimpMessageType.MESSAGE, null, SESSION_ID, sourceDestination);
|
||||
UserDestinationResult actual = this.resolver.resolveDestination(message);
|
||||
|
||||
assertEquals(sourceDestination, actual.getSourceDestination());
|
||||
|
|
@ -166,29 +166,29 @@ public class DefaultUserDestinationResolverTests {
|
|||
public void ignoreMessage() {
|
||||
|
||||
// no destination
|
||||
Message<?> message = createMessage(SimpMessageType.MESSAGE, this.user, SESSION_ID, null);
|
||||
Message<?> message = createWith(SimpMessageType.MESSAGE, this.user, SESSION_ID, null);
|
||||
UserDestinationResult actual = this.resolver.resolveDestination(message);
|
||||
assertNull(actual);
|
||||
|
||||
// not a user destination
|
||||
message = createMessage(SimpMessageType.MESSAGE, this.user, SESSION_ID, "/queue/foo");
|
||||
message = createWith(SimpMessageType.MESSAGE, this.user, SESSION_ID, "/queue/foo");
|
||||
actual = this.resolver.resolveDestination(message);
|
||||
assertNull(actual);
|
||||
|
||||
// subscribe + not a user destination
|
||||
message = createMessage(SimpMessageType.SUBSCRIBE, this.user, SESSION_ID, "/queue/foo");
|
||||
message = createWith(SimpMessageType.SUBSCRIBE, this.user, SESSION_ID, "/queue/foo");
|
||||
actual = this.resolver.resolveDestination(message);
|
||||
assertNull(actual);
|
||||
|
||||
// no match on message type
|
||||
message = createMessage(SimpMessageType.CONNECT, this.user, SESSION_ID, "user/joe/queue/foo");
|
||||
message = createWith(SimpMessageType.CONNECT, this.user, SESSION_ID, "user/joe/queue/foo");
|
||||
actual = this.resolver.resolveDestination(message);
|
||||
assertNull(actual);
|
||||
}
|
||||
|
||||
|
||||
private Message<?> createMessage(SimpMessageType messageType, TestPrincipal user, String sessionId, String destination) {
|
||||
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(messageType);
|
||||
private Message<?> createWith(SimpMessageType type, TestPrincipal user, String sessionId, String destination) {
|
||||
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(type);
|
||||
if (destination != null) {
|
||||
headers.setDestination(destination);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -16,6 +16,8 @@
|
|||
|
||||
package org.springframework.messaging.simp.user;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashSet;
|
||||
|
|
@ -23,10 +25,9 @@ import java.util.List;
|
|||
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* Test fixture for {@link org.springframework.messaging.simp.user.DefaultUserSessionRegistry}
|
||||
* Test fixture for
|
||||
* {@link org.springframework.messaging.simp.user.DefaultUserSessionRegistry}
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 4.0
|
||||
|
|
@ -57,10 +58,9 @@ public class DefaultUserSessionRegistryTests {
|
|||
}
|
||||
|
||||
assertEquals(new LinkedHashSet<>(sessionIds), resolver.getSessionIds(user));
|
||||
assertEquals(Collections.emptySet(), resolver.getSessionIds("jane"));
|
||||
assertEquals(Collections.<String>emptySet(), resolver.getSessionIds("jane"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void removeSessionIds() {
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2014 the original author or authors.
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -16,6 +16,10 @@
|
|||
|
||||
package org.springframework.messaging.simp.user;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.BDDMockito.*;
|
||||
import static org.springframework.messaging.simp.SimpMessageHeaderAccessor.ORIGINAL_DESTINATION;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
|
@ -31,30 +35,29 @@ import org.springframework.messaging.simp.SimpMessageType;
|
|||
import org.springframework.messaging.simp.TestPrincipal;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.BDDMockito.*;
|
||||
|
||||
/**
|
||||
* Unit tests for {@link org.springframework.messaging.simp.user.UserDestinationMessageHandler}.
|
||||
* Unit tests for
|
||||
* {@link org.springframework.messaging.simp.user.UserDestinationMessageHandler}.
|
||||
*/
|
||||
public class UserDestinationMessageHandlerTests {
|
||||
|
||||
public static final String SESSION_ID = "123";
|
||||
private UserDestinationMessageHandler messageHandler;
|
||||
private static final String SESSION_ID = "123";
|
||||
|
||||
|
||||
private UserDestinationMessageHandler handler;
|
||||
|
||||
private UserSessionRegistry registry;
|
||||
|
||||
@Mock
|
||||
private SubscribableChannel brokerChannel;
|
||||
|
||||
private UserSessionRegistry registry;
|
||||
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
this.registry = new DefaultUserSessionRegistry();
|
||||
DefaultUserDestinationResolver resolver = new DefaultUserDestinationResolver(this.registry);
|
||||
this.messageHandler = new UserDestinationMessageHandler(new StubMessageChannel(), this.brokerChannel, resolver);
|
||||
UserDestinationResolver resolver = new DefaultUserDestinationResolver(this.registry);
|
||||
this.handler = new UserDestinationMessageHandler(new StubMessageChannel(), this.brokerChannel, resolver);
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -62,24 +65,26 @@ public class UserDestinationMessageHandlerTests {
|
|||
@SuppressWarnings("rawtypes")
|
||||
public void handleSubscribe() {
|
||||
given(this.brokerChannel.send(Mockito.any(Message.class))).willReturn(true);
|
||||
this.messageHandler.handleMessage(createMessage(SimpMessageType.SUBSCRIBE, "joe", SESSION_ID, "/user/queue/foo"));
|
||||
this.handler.handleMessage(createWith(SimpMessageType.SUBSCRIBE, "joe", SESSION_ID, "/user/queue/foo"));
|
||||
|
||||
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
|
||||
Mockito.verify(this.brokerChannel).send(captor.capture());
|
||||
|
||||
assertEquals("/queue/foo-user123", SimpMessageHeaderAccessor.getDestination(captor.getValue().getHeaders()));
|
||||
Message message = captor.getValue();
|
||||
assertEquals("/queue/foo-user123", SimpMessageHeaderAccessor.getDestination(message.getHeaders()));
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("rawtypes")
|
||||
public void handleUnsubscribe() {
|
||||
given(this.brokerChannel.send(Mockito.any(Message.class))).willReturn(true);
|
||||
this.messageHandler.handleMessage(createMessage(SimpMessageType.UNSUBSCRIBE, "joe", "123", "/user/queue/foo"));
|
||||
this.handler.handleMessage(createWith(SimpMessageType.UNSUBSCRIBE, "joe", "123", "/user/queue/foo"));
|
||||
|
||||
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
|
||||
Mockito.verify(this.brokerChannel).send(captor.capture());
|
||||
|
||||
assertEquals("/queue/foo-user123", SimpMessageHeaderAccessor.getDestination(captor.getValue().getHeaders()));
|
||||
Message message = captor.getValue();
|
||||
assertEquals("/queue/foo-user123", SimpMessageHeaderAccessor.getDestination(message.getHeaders()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -87,14 +92,14 @@ public class UserDestinationMessageHandlerTests {
|
|||
public void handleMessage() {
|
||||
this.registry.registerSessionId("joe", "123");
|
||||
given(this.brokerChannel.send(Mockito.any(Message.class))).willReturn(true);
|
||||
this.messageHandler.handleMessage(createMessage(SimpMessageType.MESSAGE, "joe", "123", "/user/joe/queue/foo"));
|
||||
this.handler.handleMessage(createWith(SimpMessageType.MESSAGE, "joe", "123", "/user/joe/queue/foo"));
|
||||
|
||||
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
|
||||
Mockito.verify(this.brokerChannel).send(captor.capture());
|
||||
|
||||
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(captor.getValue());
|
||||
assertEquals("/queue/foo-user123", accessor.getDestination());
|
||||
assertEquals("/user/queue/foo", accessor.getFirstNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION));
|
||||
assertEquals("/user/queue/foo", accessor.getFirstNativeHeader(ORIGINAL_DESTINATION));
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -102,25 +107,25 @@ public class UserDestinationMessageHandlerTests {
|
|||
public void ignoreMessage() {
|
||||
|
||||
// no destination
|
||||
this.messageHandler.handleMessage(createMessage(SimpMessageType.MESSAGE, "joe", "123", null));
|
||||
this.handler.handleMessage(createWith(SimpMessageType.MESSAGE, "joe", "123", null));
|
||||
Mockito.verifyZeroInteractions(this.brokerChannel);
|
||||
|
||||
// not a user destination
|
||||
this.messageHandler.handleMessage(createMessage(SimpMessageType.MESSAGE, "joe", "123", "/queue/foo"));
|
||||
this.handler.handleMessage(createWith(SimpMessageType.MESSAGE, "joe", "123", "/queue/foo"));
|
||||
Mockito.verifyZeroInteractions(this.brokerChannel);
|
||||
|
||||
// subscribe + not a user destination
|
||||
this.messageHandler.handleMessage(createMessage(SimpMessageType.SUBSCRIBE, "joe", "123", "/queue/foo"));
|
||||
this.handler.handleMessage(createWith(SimpMessageType.SUBSCRIBE, "joe", "123", "/queue/foo"));
|
||||
Mockito.verifyZeroInteractions(this.brokerChannel);
|
||||
|
||||
// no match on message type
|
||||
this.messageHandler.handleMessage(createMessage(SimpMessageType.CONNECT, "joe", "123", "user/joe/queue/foo"));
|
||||
this.handler.handleMessage(createWith(SimpMessageType.CONNECT, "joe", "123", "user/joe/queue/foo"));
|
||||
Mockito.verifyZeroInteractions(this.brokerChannel);
|
||||
}
|
||||
|
||||
|
||||
private Message<?> createMessage(SimpMessageType messageType, String user, String sessionId, String destination) {
|
||||
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(messageType);
|
||||
private Message<?> createWith(SimpMessageType type, String user, String sessionId, String destination) {
|
||||
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(type);
|
||||
if (destination != null) {
|
||||
headers.setDestination(destination);
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue