From 01c4e458c75bbdcff3772dce2c0fada56c8993b9 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 19 Jun 2013 18:54:51 -0400 Subject: [PATCH] Add support for "system" STOMP session The "system" STOMP session is established at startup and can be used to send messages without a client session, e.g. to support broadcasting from a REST/HTTP handler method. --- .../stomp/support/StompHeaderAccessor.java | 21 ++++ .../StompRelayPubSubMessageHandler.java | 107 +++++++++++++++--- .../support/PubSubHeaderAccesssor.java | 2 +- 3 files changed, 115 insertions(+), 15 deletions(-) diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompHeaderAccessor.java b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompHeaderAccessor.java index f47d98fa17..54886206e6 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompHeaderAccessor.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompHeaderAccessor.java @@ -67,6 +67,10 @@ public class StompHeaderAccessor extends PubSubHeaderAccesssor { public static final String NACK = "nack"; + public static final String LOGIN = "login"; + + public static final String PASSCODE = "passcode"; + public static final String DESTINATION = "destination"; public static final String CONTENT_TYPE = "content-type"; @@ -297,6 +301,23 @@ public class StompHeaderAccessor extends PubSubHeaderAccesssor { return getHeaderValue(NACK); } + public void setLogin(String login) { + this.headers.put(LOGIN, login); + } + + public String getLogin() { + return getHeaderValue(LOGIN); + } + + + public void setPasscode(String passcode) { + this.headers.put(PASSCODE, passcode); + } + + public String getPasscode() { + return getHeaderValue(PASSCODE); + } + public void setReceiptId(String receiptId) { this.headers.put(RECEIPT_ID, receiptId); } diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompRelayPubSubMessageHandler.java b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompRelayPubSubMessageHandler.java index 1a9fcadbec..1bfbbbf7f1 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompRelayPubSubMessageHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompRelayPubSubMessageHandler.java @@ -25,6 +25,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import org.springframework.context.SmartLifecycle; import org.springframework.http.MediaType; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -54,7 +55,10 @@ import reactor.tcp.netty.NettyTcpClient; * @since 4.0 */ @SuppressWarnings("rawtypes") -public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler { +public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler + implements SmartLifecycle { + + private static final String STOMP_RELAY_SYSTEM_SESSION_ID = "stompRelaySystemSessionId"; private MessageChannel clientChannel; @@ -62,26 +66,22 @@ public class StompRelayPubSubMessageHandler extends AbstractP private MessageConverter payloadConverter; - private final TcpClient tcpClient; + private TcpClient tcpClient; private final Map relaySessions = new ConcurrentHashMap(); + private Object lifecycleMonitor = new Object(); + + private boolean running = false; + /** * @param clientChannel a channel for sending messages from the remote message broker * back to clients */ public StompRelayPubSubMessageHandler(PubSubChannelRegistry registry) { - Assert.notNull(registry, "registry is required"); this.clientChannel = registry.getClientOutputChannel(); - - this.tcpClient = new TcpClient.Spec(NettyTcpClient.class) - .using(new Environment()) - .codec(new DelimitedCodec((byte) 0, true, StandardCodecs.STRING_CODEC)) - .connect("127.0.0.1", 61613) - .get(); - this.payloadConverter = new CompositeMessageConverter(null); } @@ -94,6 +94,71 @@ public class StompRelayPubSubMessageHandler extends AbstractP return null; } + @Override + public boolean isAutoStartup() { + return true; + } + + @Override + public int getPhase() { + return Integer.MAX_VALUE; + } + + @Override + public boolean isRunning() { + synchronized (this.lifecycleMonitor) { + return this.running; + } + } + + @Override + public void start() { + synchronized (this.lifecycleMonitor) { + + // TODO: make this configurable + + this.tcpClient = new TcpClient.Spec(NettyTcpClient.class) + .using(new Environment()) + .codec(new DelimitedCodec((byte) 0, true, StandardCodecs.STRING_CODEC)) + .connect("127.0.0.1", 61613) + .get(); + + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); + headers.setAcceptVersion("1.1,1.2"); + headers.setLogin("guest"); + headers.setPasscode("guest"); + headers.setHeartbeat(0, 0); + @SuppressWarnings("unchecked") + M message = (M) MessageBuilder.withPayload(new byte[0]).copyHeaders(headers.toStompMessageHeaders()).build(); + + RelaySession session = new RelaySession(message, headers) { + @Override + protected void sendMessageToClient(M message) { + // TODO: check for ERROR frame (reconnect?) + } + }; + this.relaySessions.put(STOMP_RELAY_SYSTEM_SESSION_ID, session); + + this.running = true; + } + } + + @Override + public void stop() { + synchronized (this.lifecycleMonitor) { + this.running = false; + this.tcpClient.close(); + } + } + + @Override + public void stop(Runnable callback) { + synchronized (this.lifecycleMonitor) { + stop(); + callback.run(); + } + } + @Override public void handleConnect(M message) { StompHeaderAccessor stompHeaders = StompHeaderAccessor.wrap(message); @@ -146,10 +211,19 @@ public class StompRelayPubSubMessageHandler extends AbstractP StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); headers.setStompCommandIfNotSet(command); + if (headers.getSessionId() == null && (StompCommand.SEND.equals(command))) { + + } + String sessionId = headers.getSessionId(); if (sessionId == null) { - logger.error("No sessionId in message " + message); - return; + if (StompCommand.SEND.equals(command)) { + sessionId = STOMP_RELAY_SYSTEM_SESSION_ID; + } + else { + logger.error("No sessionId in message " + message); + return; + } } RelaySession session = this.relaySessions.get(sessionId); @@ -163,7 +237,7 @@ public class StompRelayPubSubMessageHandler extends AbstractP } - private final class RelaySession { + private class RelaySession { private final String sessionId; @@ -236,6 +310,10 @@ public class StompRelayPubSubMessageHandler extends AbstractP } relaySessions.remove(this.sessionId); } + sendMessageToClient(message); + } + + protected void sendMessageToClient(M message) { clientChannel.send(message); } @@ -245,7 +323,7 @@ public class StompRelayPubSubMessageHandler extends AbstractP headers.setMessage(errorText); @SuppressWarnings("unchecked") M errorMessage = (M) MessageBuilder.withPayload(new byte[0]).copyHeaders(headers.toHeaders()).build(); - clientChannel.send(errorMessage); + sendMessageToClient(errorMessage); } public void forward(M message, StompHeaderAccessor headers) { @@ -309,4 +387,5 @@ public class StompRelayPubSubMessageHandler extends AbstractP return true; } } + } diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/support/PubSubHeaderAccesssor.java b/spring-websocket/src/main/java/org/springframework/web/messaging/support/PubSubHeaderAccesssor.java index a05329df2e..e429dd16ee 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/support/PubSubHeaderAccesssor.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/support/PubSubHeaderAccesssor.java @@ -171,7 +171,7 @@ public class PubSubHeaderAccesssor { if (this.headers.get(headerName) != null) { return this.headers.get(headerName); } - else if (this.originalHeaders.get(headerName) != null) { + else if ((this.originalHeaders != null) && (this.originalHeaders.get(headerName) != null)) { return this.originalHeaders.get(headerName); } return null;