Add handling for remaining STOMP server commands

This commit is contained in:
Rossen Stoyanchev 2013-05-28 12:20:56 -04:00
parent 69ef364ef9
commit 2b5acbda99
2 changed files with 34 additions and 12 deletions

View File

@ -69,6 +69,11 @@ public class RelayStompService {
this.reactor.on(Fn.$(StompCommand.SUBSCRIBE), new RelayConsumer()); this.reactor.on(Fn.$(StompCommand.SUBSCRIBE), new RelayConsumer());
this.reactor.on(Fn.$(StompCommand.SEND), new RelayConsumer()); this.reactor.on(Fn.$(StompCommand.SEND), new RelayConsumer());
this.reactor.on(Fn.$(StompCommand.DISCONNECT), new RelayConsumer()); this.reactor.on(Fn.$(StompCommand.DISCONNECT), new RelayConsumer());
this.reactor.on(Fn.$(StompCommand.ACK), new RelayConsumer());
this.reactor.on(Fn.$(StompCommand.NACK), new RelayConsumer());
this.reactor.on(Fn.$(StompCommand.BEGIN), new RelayConsumer());
this.reactor.on(Fn.$(StompCommand.COMMIT), new RelayConsumer());
this.reactor.on(Fn.$(StompCommand.ABORT), new RelayConsumer());
this.reactor.on(Fn.$("CONNECTION_CLOSED"), new Consumer<Event<String>>() { this.reactor.on(Fn.$("CONNECTION_CLOSED"), new Consumer<Event<String>>() {
@Override @Override

View File

@ -78,12 +78,10 @@ public class ServerStompMessageHandler implements StompMessageHandler {
disconnect(session, message); disconnect(session, message);
} }
else if (StompCommand.ACK.equals(command) || StompCommand.NACK.equals(command)) { else if (StompCommand.ACK.equals(command) || StompCommand.NACK.equals(command)) {
// TODO this.reactor.notify(command, Fn.event(message));
logger.warn("Ignoring " + command + ". It is not supported yet.");
} }
else if (StompCommand.BEGIN.equals(command) || StompCommand.COMMIT.equals(command) || StompCommand.ABORT.equals(command)) { else if (StompCommand.BEGIN.equals(command) || StompCommand.COMMIT.equals(command) || StompCommand.ABORT.equals(command)) {
// TODO this.reactor.notify(command, Fn.event(message));
logger.warn("Ignoring " + command + ". It is not supported yet.");
} }
else { else {
sendErrorMessage(session, "Invalid STOMP command " + command); sendErrorMessage(session, "Invalid STOMP command " + command);
@ -174,22 +172,22 @@ public class ServerStompMessageHandler implements StompMessageHandler {
this.reactor.notify(StompCommand.CONNECT, Fn.event(stompMessage, replyToKey)); this.reactor.notify(StompCommand.CONNECT, Fn.event(stompMessage, replyToKey));
} }
protected void subscribe(final StompSession session, StompMessage stompMessage) { protected void subscribe(final StompSession session, StompMessage message) {
final String subscription = stompMessage.getHeaders().getId(); final String subscriptionId = message.getHeaders().getId();
String replyToKey = StompCommand.SUBSCRIBE + ":" + session.getId() + ":" + subscription; String replyToKey = getSubscriptionReplyKey(session, subscriptionId);
// TODO: extract and remember "ack" mode // TODO: extract and remember "ack" mode
// http://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE_ack_Header // http://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE_ack_Header
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Adding subscription with replyToKey=" + replyToKey); logger.trace("Adding subscription, key=" + replyToKey);
} }
Registration<?> registration = this.reactor.on(Fn.$(replyToKey), new Consumer<Event<StompMessage>>() { Registration<?> registration = this.reactor.on(Fn.$(replyToKey), new Consumer<Event<StompMessage>>() {
@Override @Override
public void accept(Event<StompMessage> event) { public void accept(Event<StompMessage> event) {
event.getData().getHeaders().setSubscription(subscription); event.getData().getHeaders().setSubscription(subscriptionId);
try { try {
session.sendMessage(event.getData()); session.sendMessage(event.getData());
} }
@ -201,13 +199,17 @@ public class ServerStompMessageHandler implements StompMessageHandler {
addRegistration(session.getId(), registration); addRegistration(session.getId(), registration);
this.reactor.notify(StompCommand.SUBSCRIBE, Fn.event(stompMessage, replyToKey)); this.reactor.notify(StompCommand.SUBSCRIBE, Fn.event(message, replyToKey));
// TODO: need a way to communicate back if subscription was successfully created or // TODO: need a way to communicate back if subscription was successfully created or
// not in which case an ERROR should be sent back and close the connection // not in which case an ERROR should be sent back and close the connection
// http://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE // http://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE
} }
private String getSubscriptionReplyKey(StompSession session, String subscriptionId) {
return StompCommand.SUBSCRIBE + ":" + session.getId() + ":" + subscriptionId;
}
private void addRegistration(String sessionId, Registration<?> registration) { private void addRegistration(String sessionId, Registration<?> registration) {
List<Registration<?>> list = this.registrationsBySession.get(sessionId); List<Registration<?>> list = this.registrationsBySession.get(sessionId);
if (list == null) { if (list == null) {
@ -217,8 +219,23 @@ public class ServerStompMessageHandler implements StompMessageHandler {
list.add(registration); list.add(registration);
} }
protected void unsubscribe(StompSession session, StompMessage stompMessage) { protected void unsubscribe(StompSession session, StompMessage message) {
this.reactor.notify(StompCommand.UNSUBSCRIBE, Fn.event(stompMessage)); cancelRegistration(session, message.getHeaders().getId());
this.reactor.notify(StompCommand.UNSUBSCRIBE, Fn.event(message));
}
private void cancelRegistration(StompSession session, String subscriptionId) {
String key = getSubscriptionReplyKey(session, subscriptionId);
List<Registration<?>> list = this.registrationsBySession.get(session.getId());
for (Registration<?> registration : list) {
if (registration.getSelector().matches(key)) {
if (logger.isDebugEnabled()) {
logger.debug("Cancelling subscription, key=" + key);
}
list.remove(registration);
registration.cancel();
}
}
} }
protected void send(StompSession session, StompMessage stompMessage) { protected void send(StompSession session, StompMessage stompMessage) {