Fix issue in simple broker with peer-to-peer messages
Issue: SPR-10930
This commit is contained in:
parent
48caeef4de
commit
6ddacdc01d
|
@ -192,8 +192,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
|
||||||
if (SimpMessageType.MESSAGE.equals(messageType)) {
|
if (SimpMessageType.MESSAGE.equals(messageType)) {
|
||||||
sessionId = (sessionId == null) ? SystemStompRelaySession.ID : sessionId;
|
sessionId = (sessionId == null) ? SystemStompRelaySession.ID : sessionId;
|
||||||
headers.setSessionId(sessionId);
|
headers.setSessionId(sessionId);
|
||||||
command = (command == null) ? StompCommand.SEND : command;
|
headers.updateStompCommandAsClientMessage();
|
||||||
headers.setCommandIfNotSet(command);
|
|
||||||
message = MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build();
|
message = MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import org.springframework.http.MediaType;
|
import org.springframework.http.MediaType;
|
||||||
import org.springframework.messaging.Message;
|
import org.springframework.messaging.Message;
|
||||||
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
|
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
|
||||||
|
import org.springframework.messaging.simp.SimpMessageType;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
import org.springframework.util.StringUtils;
|
import org.springframework.util.StringUtils;
|
||||||
|
@ -218,9 +219,29 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
|
||||||
return toNativeHeaderMap();
|
return toNativeHeaderMap();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setCommandIfNotSet(StompCommand command) {
|
public void updateStompCommandAsClientMessage() {
|
||||||
|
|
||||||
|
Assert.state(SimpMessageType.MESSAGE.equals(getMessageType()),
|
||||||
|
"Unexpected message type " + getMessage());
|
||||||
|
|
||||||
if (getCommand() == null) {
|
if (getCommand() == null) {
|
||||||
setHeader(COMMAND_HEADER, command);
|
setHeader(COMMAND_HEADER, StompCommand.SEND);
|
||||||
|
}
|
||||||
|
else if (!getCommand().equals(StompCommand.SEND)) {
|
||||||
|
throw new IllegalStateException("Unexpected STOMP command " + getCommand());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateStompCommandAsServerMessage() {
|
||||||
|
|
||||||
|
Assert.state(SimpMessageType.MESSAGE.equals(getMessageType()),
|
||||||
|
"Unexpected message type " + getMessage());
|
||||||
|
|
||||||
|
if ((getCommand() == null) || getCommand().equals(StompCommand.SEND)) {
|
||||||
|
setHeader(COMMAND_HEADER, StompCommand.MESSAGE);
|
||||||
|
}
|
||||||
|
else if (!getCommand().equals(StompCommand.MESSAGE)) {
|
||||||
|
throw new IllegalStateException("Unexpected STOMP command " + getCommand());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -150,9 +150,6 @@ public class StompProtocolHandler implements SubProtocolHandler {
|
||||||
public void handleMessageToClient(WebSocketSession session, Message<?> message) {
|
public void handleMessageToClient(WebSocketSession session, Message<?> message) {
|
||||||
|
|
||||||
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
|
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
|
||||||
if (headers.getCommand() == null && SimpMessageType.MESSAGE == headers.getMessageType()) {
|
|
||||||
headers.setCommandIfNotSet(StompCommand.MESSAGE);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (headers.getMessageType() == SimpMessageType.CONNECT_ACK) {
|
if (headers.getMessageType() == SimpMessageType.CONNECT_ACK) {
|
||||||
StompHeaderAccessor connectedHeaders = StompHeaderAccessor.create(StompCommand.CONNECTED);
|
StompHeaderAccessor connectedHeaders = StompHeaderAccessor.create(StompCommand.CONNECTED);
|
||||||
|
@ -160,6 +157,9 @@ public class StompProtocolHandler implements SubProtocolHandler {
|
||||||
connectedHeaders.setHeartbeat(0, 0); // no heart-beat support with simple broker
|
connectedHeaders.setHeartbeat(0, 0); // no heart-beat support with simple broker
|
||||||
headers = connectedHeaders;
|
headers = connectedHeaders;
|
||||||
}
|
}
|
||||||
|
else if (SimpMessageType.MESSAGE.equals(headers.getMessageType())) {
|
||||||
|
headers.updateStompCommandAsServerMessage();
|
||||||
|
}
|
||||||
|
|
||||||
if (headers.getCommand() == StompCommand.CONNECTED) {
|
if (headers.getCommand() == StompCommand.CONNECTED) {
|
||||||
augmentConnectedHeaders(headers, session);
|
augmentConnectedHeaders(headers, session);
|
||||||
|
|
|
@ -80,7 +80,7 @@ public class AnnotationMethodIntegrationTests extends AbstractWebSocketIntegrati
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void simpleController() throws Exception {
|
public void sendMessageToController() throws Exception {
|
||||||
|
|
||||||
TextMessage message = create(StompCommand.SEND).headers("destination:/app/simple").build();
|
TextMessage message = create(StompCommand.SEND).headers("destination:/app/simple").build();
|
||||||
WebSocketSession session = doHandshake(new TestClientWebSocketHandler(0, message), "/ws").get();
|
WebSocketSession session = doHandshake(new TestClientWebSocketHandler(0, message), "/ws").get();
|
||||||
|
@ -95,10 +95,10 @@ public class AnnotationMethodIntegrationTests extends AbstractWebSocketIntegrati
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void incrementController() throws Exception {
|
public void sendMessageToControllerAndReceiveReplyViaTopic() throws Exception {
|
||||||
|
|
||||||
TextMessage message1 = create(StompCommand.SUBSCRIBE).headers(
|
TextMessage message1 = create(StompCommand.SUBSCRIBE).headers(
|
||||||
"id:subs1", "destination:/topic/increment").body("5").build();
|
"id:subs1", "destination:/topic/increment").build();
|
||||||
|
|
||||||
TextMessage message2 = create(StompCommand.SEND).headers(
|
TextMessage message2 = create(StompCommand.SEND).headers(
|
||||||
"destination:/app/topic/increment").body("5").build();
|
"destination:/app/topic/increment").body("5").build();
|
||||||
|
@ -114,6 +114,28 @@ public class AnnotationMethodIntegrationTests extends AbstractWebSocketIntegrati
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SPR-10930
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void sendMessageToBrokerAndReceiveReplyViaTopic() throws Exception {
|
||||||
|
|
||||||
|
TextMessage message1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", "destination:/topic/foo").build();
|
||||||
|
TextMessage message2 = create(StompCommand.SEND).headers("destination:/topic/foo").body("5").build();
|
||||||
|
|
||||||
|
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, message1, message2);
|
||||||
|
WebSocketSession session = doHandshake(clientHandler, "/ws").get();
|
||||||
|
|
||||||
|
try {
|
||||||
|
assertTrue(clientHandler.latch.await(2, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
String payload = clientHandler.actual.get(0).getPayload();
|
||||||
|
assertTrue("Expected STOMP Command=MESSAGE, got " + payload, payload.startsWith("MESSAGE\n"));
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
session.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@IntegrationTestController
|
@IntegrationTestController
|
||||||
static class SimpleController {
|
static class SimpleController {
|
||||||
|
|
|
@ -31,7 +31,6 @@ import org.springframework.messaging.simp.SimpMessageType;
|
||||||
import org.springframework.messaging.support.MessageBuilder;
|
import org.springframework.messaging.support.MessageBuilder;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
import static org.mockito.Mockito.*;
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -146,7 +146,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
|
||||||
|
|
||||||
@Test(expected=MessageDeliveryException.class)
|
@Test(expected=MessageDeliveryException.class)
|
||||||
public void messageDeliverExceptionIfSystemSessionForwardFails() throws Exception {
|
public void messageDeliverExceptionIfSystemSessionForwardFails() throws Exception {
|
||||||
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.MESSAGE);
|
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
|
||||||
this.relay.handleMessage(MessageBuilder.withPayloadAndHeaders("test", headers).build());
|
this.relay.handleMessage(MessageBuilder.withPayloadAndHeaders("test", headers).build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
</appender>
|
</appender>
|
||||||
|
|
||||||
<logger name="org.springframework.messaging">
|
<logger name="org.springframework.messaging">
|
||||||
<level value="trace" />
|
<level value="debug" />
|
||||||
</logger>
|
</logger>
|
||||||
|
|
||||||
<logger name="org.apache.activemq">
|
<logger name="org.apache.activemq">
|
||||||
|
|
Loading…
Reference in New Issue