Allow use of @SendToUser even w/o authenticated user

Before this change, subscribing to a user destination and use of
@SendToUser annotation required an authenticated user.

This change makes it possible to subscribe to a user destination from
WebSocket sessions without an authenticated user. In such cases the
destination is associated with one session only rather than with a
user (and all their sessions).

It is then also possible to send a message to a user destination
via "/user/{sessionId}/.." rather than "/user/{user}/...".

That means @SendToUser works relying on the session id of the input
message, effectively sending a reply to destination private to the
session.

A key use case for this is handling an exception with an
@MessageExceptionHandler method and sending a reply with @SendToUser.

Issue: SPR-11309
This commit is contained in:
Rossen Stoyanchev 2014-05-08 19:18:53 -04:00
parent 3c7bb9c279
commit 97fb308b6b
13 changed files with 229 additions and 70 deletions

View File

@ -145,10 +145,18 @@ public class SendToMethodReturnValueHandler implements HandlerMethodReturnValueH
SendToUser sendToUser = returnType.getMethodAnnotation(SendToUser.class);
if (sendToUser != null) {
boolean broadcast = sendToUser.broadcast();
String user = getUserName(message, headers);
if (user == null) {
if (sessionId == null) {
throw new MissingSessionUserException(message);
}
user = sessionId;
broadcast = false;
}
String[] destinations = getTargetDestinations(sendToUser, message, this.defaultUserDestinationPrefix);
for (String destination : destinations) {
if (sendToUser.broadcast()) {
if (broadcast) {
this.messagingTemplate.convertAndSendToUser(user, destination, returnValue);
}
else {
@ -168,13 +176,11 @@ public class SendToMethodReturnValueHandler implements HandlerMethodReturnValueH
protected String getUserName(Message<?> message, MessageHeaders headers) {
Principal principal = SimpMessageHeaderAccessor.getUser(headers);
if (principal == null) {
throw new MissingSessionUserException(message);
if (principal != null) {
return (principal instanceof DestinationUserNameProvider ?
((DestinationUserNameProvider) principal).getDestinationUserName() : principal.getName());
}
if (principal instanceof DestinationUserNameProvider) {
return ((DestinationUserNameProvider) principal).getDestinationUserName();
}
return principal.getName();
return null;
}
protected String[] getTargetDestinations(Annotation annotation, Message<?> message, String defaultPrefix) {

View File

@ -107,14 +107,15 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
return null;
}
Set<String> targetDestinations = new HashSet<String>();
Set<String> resolved = new HashSet<String>();
for (String sessionId : info.getSessionIds()) {
targetDestinations.add(getTargetDestination(destination,
info.getDestinationWithoutPrefix(), sessionId, info.getUser()));
String d = getTargetDestination(destination, info.getDestinationWithoutPrefix(), sessionId, info.getUser());
if (d != null) {
resolved.add(d);
}
}
return new UserDestinationResult(destination,
targetDestinations, info.getSubscribeDestination(), info.getUser());
return new UserDestinationResult(destination, resolved, info.getSubscribeDestination(), info.getUser());
}
private DestinationInfo parseUserDestination(Message<?> message) {
@ -134,17 +135,13 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
if (!checkDestination(destination, this.destinationPrefix)) {
return null;
}
if (principal == null) {
logger.error("Ignoring message, no principal info available");
return null;
}
if (sessionId == null) {
logger.error("Ignoring message, no session id available");
return null;
}
destinationWithoutPrefix = destination.substring(this.destinationPrefix.length()-1);
subscribeDestination = destination;
user = principal.getName();
user = (principal != null ? principal.getName() : null);
sessionIds = Collections.singleton(sessionId);
}
else if (SimpMessageType.MESSAGE.equals(messageType)) {
@ -153,11 +150,12 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
}
int startIndex = this.destinationPrefix.length();
int endIndex = destination.indexOf('/', startIndex);
Assert.isTrue(endIndex > 0, "Expected destination pattern \"/principal/{userId}/**\"");
Assert.isTrue(endIndex > 0, "Expected destination pattern \"/user/{userId}/**\"");
destinationWithoutPrefix = destination.substring(endIndex);
subscribeDestination = this.destinationPrefix.substring(0, startIndex-1) + destinationWithoutPrefix;
user = destination.substring(startIndex, endIndex);
user = StringUtils.replace(user, "%2F", "/");
user = user.equals(sessionId) ? null : user;
sessionIds = (sessionId != null ?
Collections.singleton(sessionId) : this.userSessionRegistry.getSessionIds(user));
}
@ -186,14 +184,16 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
}
/**
* Return the target destination to use. Provided as input are the original source
* destination, as well as the same destination with the target prefix removed.
* This methods determines the translated destination to use based on the source
* destination, the source destination with the user prefix removed, a session
* id, and the user for the session (if known).
*
* @param sourceDestination the source destination from the input message
* @param sourceDestinationWithoutPrefix the source destination with the target prefix removed
* @param sessionId an active user session id
* @param user the user
* @return the target destination
* @param sourceDestination the source destination of the input message
* @param sourceDestinationWithoutPrefix the source destination without the user prefix
* @param sessionId the id of the session for the target message
* @param user the user associated with the session, or {@code null}
*
* @return a target destination, or {@code null}
*/
protected String getTargetDestination(String sourceDestination,
String sourceDestinationWithoutPrefix, String sessionId, String user) {

View File

@ -171,12 +171,16 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
}
Set<String> destinations = result.getTargetDestinations();
if (destinations.isEmpty()) {
if (logger.isTraceEnabled()) {
logger.trace("No target destinations, message=" + message);
}
return;
}
if (SimpMessageType.MESSAGE.equals(SimpMessageHeaderAccessor.getMessageType(message.getHeaders()))) {
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.wrap(message);
initHeaders(headerAccessor);
headerAccessor.setNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, result.getSubscribeDestination());
String header = SimpMessageHeaderAccessor.ORIGINAL_DESTINATION;
headerAccessor.setNativeHeader(header, result.getSubscribeDestination());
message = MessageBuilder.createMessage(message.getPayload(), headerAccessor.getMessageHeaders());
}
for (String destination : destinations) {

View File

@ -45,7 +45,6 @@ public class UserDestinationResult {
Assert.notNull(sourceDestination, "'sourceDestination' must not be null");
Assert.notNull(targetDestinations, "'targetDestinations' must not be null");
Assert.notNull(subscribeDestination, "'subscribeDestination' must not be null");
Assert.notNull(user, "'user' must not be null");
this.sourceDestination = sourceDestination;
this.targetDestinations = targetDestinations;

View File

@ -186,4 +186,13 @@ public class NativeMessageHeaderAccessor extends MessageHeaderAccessor {
setModified(true);
}
public List<String> removeNativeHeader(String name) {
Assert.state(isMutable(), "Already immutable");
Map<String, List<String>> nativeHeaders = getNativeHeaders();
if (nativeHeaders == null) {
return null;
}
return nativeHeaders.remove(name);
}
}

View File

@ -332,6 +332,26 @@ public class SendToMethodReturnValueHandlerTests {
verifyNoMoreInteractions(messagingTemplate);
}
@Test
public void sendToUserSessionWithoutUserName() throws Exception {
when(this.messageChannel.send(any(Message.class))).thenReturn(true);
String sessionId = "sess1";
Message<?> inputMessage = createInputMessage(sessionId, "sub1", null, null, null);
this.handler.handleReturnValue(PAYLOAD, this.sendToUserReturnType, inputMessage);
verify(this.messageChannel, times(2)).send(this.messageCaptor.capture());
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(this.messageCaptor.getAllValues().get(0));
assertEquals("/user/sess1/dest1", headers.getDestination());
assertEquals("sess1", headers.getSessionId());
headers = SimpMessageHeaderAccessor.wrap(this.messageCaptor.getAllValues().get(1));
assertEquals("/user/sess1/dest2", headers.getDestination());
assertEquals("sess1", headers.getSessionId());
}
private Message<?> createInputMessage(String sessId, String subsId, String destinationPrefix,
String destination, Principal principal) {

View File

@ -29,7 +29,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
/**
* Unit tests for {@link org.springframework.messaging.simp.user.DefaultUserDestinationResolver}.
* Unit tests for
* {@link org.springframework.messaging.simp.user.DefaultUserDestinationResolver}.
*
* @author Rossen Stoyanchev
*/
@ -81,6 +82,19 @@ public class DefaultUserDestinationResolverTests {
assertEquals("/queue/foo-user123", actual.getTargetDestinations().iterator().next());
}
@Test
public void handleSubscribeNoUser() {
String sourceDestination = "/user/queue/foo";
Message<?> message = createMessage(SimpMessageType.SUBSCRIBE, null, SESSION_ID, sourceDestination);
UserDestinationResult actual = this.resolver.resolveDestination(message);
assertEquals(sourceDestination, actual.getSourceDestination());
assertEquals(1, actual.getTargetDestinations().size());
assertEquals("/queue/foo-user" + SESSION_ID, actual.getTargetDestinations().iterator().next());
assertEquals(sourceDestination, actual.getSubscribeDestination());
assertNull(actual.getUser());
}
@Test
public void handleUnsubscribe() {
Message<?> message = createMessage(SimpMessageType.UNSUBSCRIBE, this.user, SESSION_ID, "/user/queue/foo");
@ -116,6 +130,19 @@ public class DefaultUserDestinationResolverTests {
assertEquals("/queue/foo-useropenid123", actual.getTargetDestinations().iterator().next());
}
@Test
public void handleMessageWithNoUser() {
String sourceDestination = "/user/" + SESSION_ID + "/queue/foo";
Message<?> message = createMessage(SimpMessageType.MESSAGE, null, SESSION_ID, sourceDestination);
UserDestinationResult actual = this.resolver.resolveDestination(message);
assertEquals(sourceDestination, actual.getSourceDestination());
assertEquals(1, actual.getTargetDestinations().size());
assertEquals("/queue/foo-user123", actual.getTargetDestinations().iterator().next());
assertEquals("/user/queue/foo", actual.getSubscribeDestination());
assertNull(actual.getUser());
}
@Test
public void ignoreMessage() {
@ -129,11 +156,6 @@ public class DefaultUserDestinationResolverTests {
actual = this.resolver.resolveDestination(message);
assertNull(actual);
// subscribe + no user
message = createMessage(SimpMessageType.SUBSCRIBE, null, SESSION_ID, "/user/queue/foo");
actual = this.resolver.resolveDestination(message);
assertNull(actual);
// subscribe + not a user destination
message = createMessage(SimpMessageType.SUBSCRIBE, this.user, SESSION_ID, "/queue/foo");
actual = this.resolver.resolveDestination(message);

View File

@ -108,10 +108,6 @@ public class UserDestinationMessageHandlerTests {
this.messageHandler.handleMessage(createMessage(SimpMessageType.MESSAGE, "joe", "123", "/queue/foo"));
Mockito.verifyZeroInteractions(this.brokerChannel);
// subscribe + no user
this.messageHandler.handleMessage(createMessage(SimpMessageType.SUBSCRIBE, null, "123", "/user/queue/foo"));
Mockito.verifyZeroInteractions(this.brokerChannel);
// subscribe + not a user destination
this.messageHandler.handleMessage(createMessage(SimpMessageType.SUBSCRIBE, "joe", "123", "/queue/foo"));
Mockito.verifyZeroInteractions(this.brokerChannel);

View File

@ -301,6 +301,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
String origDestination = stompAccessor.getFirstNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION);
if (origDestination != null) {
stompAccessor = toMutableAccessor(stompAccessor, message);
stompAccessor.removeNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION);
stompAccessor.setDestination(origDestination);
}
}

View File

@ -246,6 +246,7 @@ public class StompSubProtocolHandlerTests {
assertEquals(1, this.session.getSentMessages().size());
WebSocketMessage<?> textMessage = this.session.getSentMessages().get(0);
assertTrue(((String) textMessage.getPayload()).contains("destination:/user/queue/foo\n"));
assertFalse(((String) textMessage.getPayload()).contains(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION));
}
@Test

View File

@ -36,6 +36,7 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.StompCommand;
@ -118,10 +119,10 @@ public class StompWebSocketIntegrationTests extends AbstractWebSocketIntegration
@Test
public void sendMessageToBrokerAndReceiveReplyViaTopic() throws Exception {
TextMessage message1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", "destination:/topic/foo").build();
TextMessage message2 = create(StompCommand.SEND).headers("destination:/topic/foo").body("5").build();
TextMessage m1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", "destination:/topic/foo").build();
TextMessage m2 = create(StompCommand.SEND).headers("destination:/topic/foo").body("5").build();
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, message1, message2);
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, m1, m2);
WebSocketSession session = doHandshake(clientHandler, "/ws").get();
try {
@ -140,8 +141,8 @@ public class StompWebSocketIntegrationTests extends AbstractWebSocketIntegration
@Test
public void sendSubscribeToControllerAndReceiveReply() throws Exception {
TextMessage message = create(StompCommand.SUBSCRIBE).headers(
"id:subs1", "destination:/app/number").build();
String destHeader = "destination:/app/number";
TextMessage message = create(StompCommand.SUBSCRIBE).headers("id:subs1", destHeader).build();
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, message);
WebSocketSession session = doHandshake(clientHandler, "/ws").get();
@ -149,7 +150,7 @@ public class StompWebSocketIntegrationTests extends AbstractWebSocketIntegration
try {
assertTrue(clientHandler.latch.await(2, TimeUnit.SECONDS));
String payload = clientHandler.actual.get(0).getPayload();
assertTrue("Expected STOMP destination=/app/number, got " + payload, payload.contains("destination:/app/number"));
assertTrue("Expected STOMP destination=/app/number, got " + payload, payload.contains(destHeader));
assertTrue("Expected STOMP Payload=42, got " + payload, payload.contains("42"));
}
finally {
@ -157,6 +158,29 @@ public class StompWebSocketIntegrationTests extends AbstractWebSocketIntegration
}
}
@Test
public void handleExceptionAndSendToUser() throws Exception {
String destHeader = "destination:/user/queue/error";
TextMessage m1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", destHeader).build();
TextMessage m2 = create(StompCommand.SEND).headers("destination:/app/exception").build();
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, m1, m2);
WebSocketSession session = doHandshake(clientHandler, "/ws").get();
try {
assertTrue(clientHandler.latch.await(2, TimeUnit.SECONDS));
String payload = clientHandler.actual.get(0).getPayload();
assertTrue(payload.startsWith("MESSAGE\n"));
assertTrue(payload.contains("destination:/user/queue/error\n"));
assertTrue(payload.endsWith("\"Got error: Bad input\"\0"));
}
finally {
session.close();
}
}
@IntegrationTestController
static class SimpleController {
@ -174,10 +198,10 @@ public class StompWebSocketIntegrationTests extends AbstractWebSocketIntegration
}
@MessageExceptionHandler
public void handleException(IllegalArgumentException ex) {
@SendToUser("/queue/error")
public String handleException(IllegalArgumentException ex) {
return "Got error: " + ex.getMessage();
}
}
@IntegrationTestController

View File

@ -5,6 +5,5 @@ log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} [%c] - %m%n
log4j.rootCategory=WARN, console
log4j.logger.org.springframework.web=DEBUG
log4j.logger.org.springframework.web.socket=DEBUG
#log4j.logger.org.springframework.web.socket=TRACE
log4j.logger.org.springframework.messaging=DEBUG

View File

@ -38116,35 +38116,113 @@ be plugged in (see examples in <<websocket-server-deployment>>).
==== User Destinations
An application can send messages targeting a specific user.
In order for a connected user to receive messages, they must be authenticated
so that their session is associated with a concrete user name.
See the previous section on information about authentication.
Spring's STOMP support recognizes destinations prefixed with `/user/`.
For example, a client can subscribe to destination `/user/position-updates`.
Spring's STOMP support recognizes destinations prefixed with `"/user/"`.
For example, a client might subscribe to the destination `"/user/position-updates"`.
This destination will be handled by the `UserDestinationMessageHandler` and
transformed into a destination unique to the user's session,
e.g. `/user/position-updates-123`. This provides the convenience of subscribing
to a generically named destination, while also ensuring that it doesn't "collide"
with any other user that also subscribes to `/user/position-updates`
in order to receive stock position updates unique to them.
transformed into a destination unique to the user session,
e.g. `"/user/position-updates-user123"`. This provides the convenience of subscribing
to a generically named destination while at the same time ensuring no collisions
with other users subscribing to the same destination so that each user can receive
unique stock position updates.
On the sending side, messages can be sent to a destination such as
`/user/{username}/position-updates`, which in turn will be translated
by the `UserDestinationMessageHandler` into the same unique destination
belonging to the specified user name.
On the sending side messages can be sent to a destination such as
`"/user/{username}/position-updates"`, which in turn will be translated
by the `UserDestinationMessageHandler` into one or more destinations, one for each
session associated with the user. This allows any component within the application to
send messages targeting a specific user without necessarily knowing anything more
than their name and the generic destination. This is also supported through an
annotation as well as a messaging template.
This allows any component within the application to send messages to a specific
user without necessarily knowing anything more than their name and a generic
destination.
For example message-handling method can send messages to the user associated with
the message being handled through the `@SendToUser` annotation:
When this is used with an external message broker, check the broker documentation
on how to manage inactive queues, so that when the user session is over, all
unique user queues are removed. For example, RabbitMQ creates auto-delete queues
when destinations like `/exchange/amq.direct/position-updates` are used.
[source,java,indent=0]
[subs="verbatim,quotes"]
----
@Controller
public class MyController {
@MessageMapping("/trade")
@SendToUser("/queue/position-updates")
public TradeResult executeTrade(Trade trade, Principal principal) {
// ...
return tradeResult;
}
}
----
If the user has more than one sessions, by default all of the sessions subscribed
to the given destination are targeted. However sometimes, it may be necessary to
target only the session that sent the message being handled. This can be done by
setting the `broadcast` attribute to false, for example:
[source,java,indent=0]
[subs="verbatim,quotes"]
----
@Controller
public class MyController {
@MessageMapping("/action")
public void handleAction() throws Exception{
// raise MyBusinessException here
}
@MessageExceptionHandler
@SendToUser(value="/queue/errors", broadcast=false)
public ApplicationError handleException(MyBusinessException exception) {
// ...
return appError;
}
}
----
[NOTE]
====
While user destinations generally imply an authenticated user, it isn't required
strictly. A WebSocket session that is not associated with an authenticated user
can subscribe to a user destination. In such cases the `@SendToUser` annotation
will behave exactly the same as with `broadcast=false`, i.e. targeting only the
session that sent the message being handled.
====
It is also possible to send a message to user destinations from any application
component by injecting the `SimpMessageTemplate` created by the Java config or
XML namespace, for example (the bean name is "brokerMessagingTemplate` if required
for qualification with `@Qualifier`):
[source,java,indent=0]
[subs="verbatim,quotes"]
----
@Service
public class TradeServiceImpl implements TradeService {
private final SimpMessageTemplate messagingTemplate;
@Autowired
public TradeServiceImpl(SimpMessageTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
// ...
public void afterTradeExecuted(Trade trade) {
this.messagingTemplate.convertAndSendToUser(
trade.getUserName(), "/queue/position-updates", trade.getResult());
}
----
[NOTE]
====
When using user destinations with an external message broker, check the broker
documentation on how to manage inactive queues, so that when the user session is
over, all unique user queues are removed. For example, RabbitMQ creates auto-delete
queues when destinations like `/exchange/amq.direct/position-updates` are used.
So in that case the client could subscribe to `/user/exchange/amq.direct/position-updates`.
ActiveMQ has http://activemq.apache.org/delete-inactive-destinations.html[configuration options]
for purging inactive destinations.
====