Fix issue with subscribe destination

The original fix for SPR-11423:
32e5f57e64

was insufficient when using an external broker since the original
destination header has to be in the "native headers" map (i.e. with
STOMP headers) in order to be included in messages broadcast by
the broker.
This commit is contained in:
Rossen Stoyanchev 2014-04-25 12:26:04 -04:00
parent 564a555619
commit 1200755125
4 changed files with 15 additions and 10 deletions

View File

@ -175,10 +175,8 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
}
if (SimpMessageType.MESSAGE.equals(SimpMessageHeaderAccessor.getMessageType(message.getHeaders()))) {
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.wrap(message);
if (getHeaderInitializer() != null) {
getHeaderInitializer().initHeaders(headerAccessor);
}
headerAccessor.setHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, result.getSubscribeDestination());
initHeaders(headerAccessor);
headerAccessor.setNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, result.getSubscribeDestination());
message = MessageBuilder.createMessage(message.getPayload(), headerAccessor.getMessageHeaders());
}
for (String destination : destinations) {
@ -189,4 +187,10 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
}
}
private void initHeaders(SimpMessageHeaderAccessor headerAccessor) {
if (getHeaderInitializer() != null) {
getHeaderInitializer().initHeaders(headerAccessor);
}
}
}

View File

@ -91,8 +91,9 @@ public class UserDestinationMessageHandlerTests {
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
Mockito.verify(this.brokerChannel).send(captor.capture());
assertEquals("/queue/foo-user123", SimpMessageHeaderAccessor.getDestination(captor.getValue().getHeaders()));
assertEquals("/user/queue/foo", captor.getValue().getHeaders().get(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION));
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(captor.getValue());
assertEquals("/queue/foo-user123", accessor.getDestination());
assertEquals("/user/queue/foo", accessor.getFirstNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION));
}

View File

@ -298,10 +298,10 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
logger.error("Ignoring message, no subscriptionId header: " + message);
return;
}
String header = SimpMessageHeaderAccessor.ORIGINAL_DESTINATION;
if (message.getHeaders().containsKey(header)) {
String origDestination = stompAccessor.getFirstNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION);
if (origDestination != null) {
stompAccessor = toMutableAccessor(stompAccessor, message);
stompAccessor.setDestination((String) message.getHeaders().get(header));
stompAccessor.setDestination(origDestination);
}
}
else if (StompCommand.CONNECTED.equals(command)) {

View File

@ -239,7 +239,7 @@ public class StompSubProtocolHandlerTests {
headers.setMessageId("mess0");
headers.setSubscriptionId("sub0");
headers.setDestination("/queue/foo-user123");
headers.setHeader(StompHeaderAccessor.ORIGINAL_DESTINATION, "/user/queue/foo");
headers.setNativeHeader(StompHeaderAccessor.ORIGINAL_DESTINATION, "/user/queue/foo");
Message<byte[]> message = MessageBuilder.createMessage(EMPTY_PAYLOAD, headers.getMessageHeaders());
this.protocolHandler.handleMessageToClient(this.session, message);