From 2b5acbda997b8c235d415c14e9ac1987fbab492a Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 28 May 2013 12:20:56 -0400 Subject: [PATCH] Add handling for remaining STOMP server commands --- .../stomp/server/RelayStompService.java | 5 +++ .../server/ServerStompMessageHandler.java | 41 +++++++++++++------ 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/server/RelayStompService.java b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/server/RelayStompService.java index 697e279814b..0f012fb9ec8 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/server/RelayStompService.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/server/RelayStompService.java @@ -69,6 +69,11 @@ public class RelayStompService { this.reactor.on(Fn.$(StompCommand.SUBSCRIBE), new RelayConsumer()); this.reactor.on(Fn.$(StompCommand.SEND), new RelayConsumer()); this.reactor.on(Fn.$(StompCommand.DISCONNECT), new RelayConsumer()); + this.reactor.on(Fn.$(StompCommand.ACK), new RelayConsumer()); + this.reactor.on(Fn.$(StompCommand.NACK), new RelayConsumer()); + this.reactor.on(Fn.$(StompCommand.BEGIN), new RelayConsumer()); + this.reactor.on(Fn.$(StompCommand.COMMIT), new RelayConsumer()); + this.reactor.on(Fn.$(StompCommand.ABORT), new RelayConsumer()); this.reactor.on(Fn.$("CONNECTION_CLOSED"), new Consumer>() { @Override diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/server/ServerStompMessageHandler.java b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/server/ServerStompMessageHandler.java index 9a1210e45b0..b25d7634fe9 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/server/ServerStompMessageHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/server/ServerStompMessageHandler.java @@ -78,12 +78,10 @@ public class ServerStompMessageHandler implements StompMessageHandler { disconnect(session, message); } else if (StompCommand.ACK.equals(command) || StompCommand.NACK.equals(command)) { - // TODO - logger.warn("Ignoring " + command + ". It is not supported yet."); + this.reactor.notify(command, Fn.event(message)); } else if (StompCommand.BEGIN.equals(command) || StompCommand.COMMIT.equals(command) || StompCommand.ABORT.equals(command)) { - // TODO - logger.warn("Ignoring " + command + ". It is not supported yet."); + this.reactor.notify(command, Fn.event(message)); } else { sendErrorMessage(session, "Invalid STOMP command " + command); @@ -174,22 +172,22 @@ public class ServerStompMessageHandler implements StompMessageHandler { this.reactor.notify(StompCommand.CONNECT, Fn.event(stompMessage, replyToKey)); } - protected void subscribe(final StompSession session, StompMessage stompMessage) { + protected void subscribe(final StompSession session, StompMessage message) { - final String subscription = stompMessage.getHeaders().getId(); - String replyToKey = StompCommand.SUBSCRIBE + ":" + session.getId() + ":" + subscription; + final String subscriptionId = message.getHeaders().getId(); + String replyToKey = getSubscriptionReplyKey(session, subscriptionId); // TODO: extract and remember "ack" mode // http://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE_ack_Header if (logger.isTraceEnabled()) { - logger.trace("Adding subscription with replyToKey=" + replyToKey); + logger.trace("Adding subscription, key=" + replyToKey); } Registration registration = this.reactor.on(Fn.$(replyToKey), new Consumer>() { @Override public void accept(Event event) { - event.getData().getHeaders().setSubscription(subscription); + event.getData().getHeaders().setSubscription(subscriptionId); try { session.sendMessage(event.getData()); } @@ -201,13 +199,17 @@ public class ServerStompMessageHandler implements StompMessageHandler { addRegistration(session.getId(), registration); - this.reactor.notify(StompCommand.SUBSCRIBE, Fn.event(stompMessage, replyToKey)); + this.reactor.notify(StompCommand.SUBSCRIBE, Fn.event(message, replyToKey)); // TODO: need a way to communicate back if subscription was successfully created or // not in which case an ERROR should be sent back and close the connection // http://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE } + private String getSubscriptionReplyKey(StompSession session, String subscriptionId) { + return StompCommand.SUBSCRIBE + ":" + session.getId() + ":" + subscriptionId; + } + private void addRegistration(String sessionId, Registration registration) { List> list = this.registrationsBySession.get(sessionId); if (list == null) { @@ -217,8 +219,23 @@ public class ServerStompMessageHandler implements StompMessageHandler { list.add(registration); } - protected void unsubscribe(StompSession session, StompMessage stompMessage) { - this.reactor.notify(StompCommand.UNSUBSCRIBE, Fn.event(stompMessage)); + protected void unsubscribe(StompSession session, StompMessage message) { + cancelRegistration(session, message.getHeaders().getId()); + this.reactor.notify(StompCommand.UNSUBSCRIBE, Fn.event(message)); + } + + private void cancelRegistration(StompSession session, String subscriptionId) { + String key = getSubscriptionReplyKey(session, subscriptionId); + List> list = this.registrationsBySession.get(session.getId()); + for (Registration registration : list) { + if (registration.getSelector().matches(key)) { + if (logger.isDebugEnabled()) { + logger.debug("Cancelling subscription, key=" + key); + } + list.remove(registration); + registration.cancel(); + } + } } protected void send(StompSession session, StompMessage stompMessage) {