Add AbstractStompService

This commit is contained in:
Rossen Stoyanchev 2013-05-30 11:50:07 -04:00
parent 4de40fad8e
commit b194d4d6a0
4 changed files with 282 additions and 157 deletions

View File

@ -0,0 +1,145 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.util.Assert;
import org.springframework.web.messaging.stomp.StompCommand;
import org.springframework.web.messaging.stomp.StompMessage;
import reactor.Fn;
import reactor.core.Reactor;
import reactor.fn.Consumer;
import reactor.fn.Event;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public abstract class AbstractStompService {
protected final Log logger = LogFactory.getLog(getClass());
private final Reactor reactor;
public AbstractStompService(Reactor reactor) {
Assert.notNull(reactor, "reactor is required");
this.reactor = reactor;
this.reactor.on(Fn.$(StompCommand.CONNECT), new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
processConnect(event.getData(), event.getReplyTo());
}
});
this.reactor.on(Fn.$(StompCommand.SUBSCRIBE), new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
processSubscribe(event.getData(), event.getReplyTo());
}
});
this.reactor.on(Fn.$(StompCommand.SEND), new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
processSend(event.getData());
}
});
this.reactor.on(Fn.$(StompCommand.DISCONNECT), new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
processDisconnect(event.getData());
}
});
this.reactor.on(Fn.$(StompCommand.ACK), new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
processAck(event.getData());
}
});
this.reactor.on(Fn.$(StompCommand.NACK), new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
processNack(event.getData());
}
});
this.reactor.on(Fn.$(StompCommand.BEGIN), new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
processBegin(event.getData());
}
});
this.reactor.on(Fn.$(StompCommand.COMMIT), new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
processCommit(event.getData());
}
});
this.reactor.on(Fn.$(StompCommand.ABORT), new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
processAbort(event.getData());
}
});
this.reactor.on(Fn.$("CONNECTION_CLOSED"), new Consumer<Event<String>>() {
@Override
public void accept(Event<String> event) {
processConnectionClosed(event.getData());
}
});
}
public Reactor getReactor() {
return this.reactor;
}
protected void processConnect(StompMessage message, Object replyTo) {
}
protected void processSubscribe(StompMessage message, Object replyTo) {
}
protected void processSend(StompMessage message) {
}
protected void processDisconnect(StompMessage message) {
}
protected void processAck(StompMessage message) {
}
protected void processNack(StompMessage message) {
}
protected void processBegin(StompMessage message) {
}
protected void processCommit(StompMessage message) {
}
protected void processAbort(StompMessage message) {
}
protected void processConnectionClosed(String sessionId) {
}
}

View File

@ -28,17 +28,13 @@ import java.util.concurrent.ConcurrentHashMap;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.web.messaging.stomp.StompCommand;
import org.springframework.web.messaging.stomp.StompHeaders;
import org.springframework.web.messaging.stomp.StompMessage;
import org.springframework.web.messaging.stomp.support.StompMessageConverter;
import reactor.Fn;
import reactor.core.Reactor;
import reactor.fn.Consumer;
import reactor.fn.Event;
import reactor.util.Assert;
@ -47,12 +43,8 @@ import reactor.util.Assert;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class RelayStompService {
public class RelayStompService extends AbstractStompService {
private static final Log logger = LogFactory.getLog(RelayStompService.class);
private final Reactor reactor;
private Map<String, RelaySession> relaySessions = new ConcurrentHashMap<String, RelaySession>();
@ -62,43 +54,46 @@ public class RelayStompService {
public RelayStompService(Reactor reactor, TaskExecutor executor) {
this.reactor = reactor;
super(reactor);
this.taskExecutor = executor; // For now, a naive way to manage socket reading
this.reactor.on(Fn.$(StompCommand.CONNECT), new ConnectConsumer());
this.reactor.on(Fn.$(StompCommand.SUBSCRIBE), new RelayConsumer());
this.reactor.on(Fn.$(StompCommand.SEND), new RelayConsumer());
this.reactor.on(Fn.$(StompCommand.DISCONNECT), new RelayConsumer());
this.reactor.on(Fn.$(StompCommand.ACK), new RelayConsumer());
this.reactor.on(Fn.$(StompCommand.NACK), new RelayConsumer());
this.reactor.on(Fn.$(StompCommand.BEGIN), new RelayConsumer());
this.reactor.on(Fn.$(StompCommand.COMMIT), new RelayConsumer());
this.reactor.on(Fn.$(StompCommand.ABORT), new RelayConsumer());
this.reactor.on(Fn.$("CONNECTION_CLOSED"), new Consumer<Event<String>>() {
@Override
public void accept(Event<String> event) {
if (logger.isDebugEnabled()) {
logger.debug("CONNECTION_CLOSED, STOMP session=" + event.getData() + ". Clearing relay session");
}
clearRelaySession(event.getData());
}
});
}
private void relayStompMessage(RelaySession session, StompMessage stompMessage) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("Forwarding: " + stompMessage);
protected void processConnect(StompMessage stompMessage, final Object replyTo) {
final String stompSessionId = stompMessage.getStompSessionId();
final RelaySession session = new RelaySession();
this.relaySessions.put(stompSessionId, session);
try {
Socket socket = SocketFactory.getDefault().createSocket("127.0.0.1", 61613);
session.setSocket(socket);
relayStompMessage(stompMessage);
taskExecutor.execute(new RelayReadTask(stompSessionId, replyTo, session));
}
catch (Throwable t) {
t.printStackTrace();
clearRelaySession(stompSessionId);
}
byte[] bytes = converter.fromStompMessage(stompMessage);
session.getOutputStream().write(bytes);
session.getOutputStream().flush();
}
private RelaySession getRelaySession(String stompSessionId) {
RelaySession session = RelayStompService.this.relaySessions.get(stompSessionId);
private void relayStompMessage(StompMessage stompMessage) {
RelaySession session = RelayStompService.this.relaySessions.get(stompMessage.getStompSessionId());
Assert.notNull(session, "RelaySession not found");
return session;
try {
if (logger.isTraceEnabled()) {
logger.trace("Forwarding: " + stompMessage);
}
byte[] bytes = converter.fromStompMessage(stompMessage);
session.getOutputStream().write(bytes);
session.getOutputStream().flush();
}
catch (Exception e) {
e.printStackTrace();
clearRelaySession(stompMessage.getStompSessionId());
}
}
private void clearRelaySession(String stompSessionId) {
@ -114,34 +109,55 @@ public class RelayStompService {
}
}
private final class ConnectConsumer implements Consumer<Event<StompMessage>> {
@Override
public void accept(Event<StompMessage> event) {
StompMessage stompMessage = event.getData();
final Object replyTo = event.getReplyTo();
final String stompSessionId = stompMessage.getStompSessionId();
final RelaySession session = new RelaySession();
relaySessions.put(stompSessionId, session);
try {
Socket socket = SocketFactory.getDefault().createSocket("127.0.0.1", 61613);
session.setSocket(socket);
relayStompMessage(session, stompMessage);
taskExecutor.execute(new RelayReadTask(stompSessionId, replyTo, session));
}
catch (Throwable t) {
t.printStackTrace();
clearRelaySession(stompSessionId);
}
}
@Override
protected void processSubscribe(StompMessage message, Object replyTo) {
relayStompMessage(message);
}
@Override
protected void processSend(StompMessage message) {
relayStompMessage(message);
}
@Override
protected void processDisconnect(StompMessage message) {
relayStompMessage(message);
}
@Override
protected void processAck(StompMessage message) {
relayStompMessage(message);
}
@Override
protected void processNack(StompMessage message) {
relayStompMessage(message);
}
@Override
protected void processBegin(StompMessage message) {
relayStompMessage(message);
}
@Override
protected void processCommit(StompMessage message) {
relayStompMessage(message);
}
@Override
protected void processAbort(StompMessage message) {
relayStompMessage(message);
}
@Override
protected void processConnectionClosed(String sessionId) {
if (logger.isDebugEnabled()) {
logger.debug("Client connection closed for STOMP session=" + sessionId + ". Clearing relay session.");
}
clearRelaySession(sessionId);
}
private final static class RelaySession {
private Socket socket;
@ -194,7 +210,7 @@ public class RelayStompService {
else if (b == 0x00) {
byte[] bytes = out.toByteArray();
StompMessage message = RelayStompService.this.converter.toStompMessage(bytes);
RelayStompService.this.reactor.notify(replyTo, Fn.event(message));
getReactor().notify(replyTo, Event.wrap(message));
out.reset();
}
else {
@ -202,35 +218,19 @@ public class RelayStompService {
}
}
logger.debug("Socket closed, STOMP session=" + stompSessionId);
sendLostConnectionErrorMessage();
sendErrorMessage("Lost connection");
}
catch (IOException e) {
e.printStackTrace();
logger.error("Socket error: " + e.getMessage());
clearRelaySession(stompSessionId);
}
}
private void sendLostConnectionErrorMessage() {
private void sendErrorMessage(String message) {
StompHeaders headers = new StompHeaders();
headers.setMessage("Lost connection");
headers.setMessage(message);
StompMessage errorMessage = new StompMessage(StompCommand.ERROR, headers);
RelayStompService.this.reactor.notify(replyTo, Fn.event(errorMessage));
}
}
private class RelayConsumer implements Consumer<Event<StompMessage>> {
@Override
public void accept(Event<StompMessage> event) {
StompMessage stompMessage = event.getData();
RelaySession session = getRelaySession(stompMessage.getStompSessionId());
try {
relayStompMessage(session, stompMessage);
}
catch (Exception e) {
e.printStackTrace();
clearRelaySession(stompMessage.getStompSessionId());
}
getReactor().notify(replyTo, Event.wrap(errorMessage));
}
}

View File

@ -21,8 +21,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.web.messaging.stomp.StompCommand;
import org.springframework.web.messaging.stomp.StompHeaders;
import org.springframework.web.messaging.stomp.StompMessage;
@ -38,20 +36,35 @@ import reactor.fn.Registration;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class SimpleStompService {
public class SimpleStompService extends AbstractStompService {
private static final Log logger = LogFactory.getLog(SimpleStompService.class);
private final Reactor reactor;
private Map<String, List<Registration<?>>> subscriptionsBySession = new ConcurrentHashMap<String, List<Registration<?>>>();
private Map<String, List<Registration<?>>> subscriptionsBySession =
new ConcurrentHashMap<String, List<Registration<?>>>();
public SimpleStompService(Reactor reactor) {
this.reactor = reactor;
this.reactor.on(Fn.$(StompCommand.SUBSCRIBE), new SubscribeConsumer());
this.reactor.on(Fn.$(StompCommand.SEND), new SendConsumer());
this.reactor.on(Fn.$(StompCommand.DISCONNECT), new DisconnectConsumer());
super(reactor);
}
@Override
protected void processSubscribe(StompMessage message, final Object replyTo) {
if (logger.isDebugEnabled()) {
logger.debug("Subscribe " + message);
}
Registration<?> registration = getReactor().on(
Fn.$("destination:" + message.getHeaders().getDestination()),
new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> sendEvent) {
StompMessage inMessage = sendEvent.getData();
StompHeaders headers = new StompHeaders();
headers.setDestination(inMessage.getHeaders().getDestination());
StompMessage outMessage = new StompMessage(StompCommand.MESSAGE, headers, inMessage.getPayload());
getReactor().notify(replyTo, Event.wrap(outMessage));
}
});
addSubscription(message.getStompSessionId(), registration);
}
private void addSubscription(String sessionId, Registration<?> registration) {
@ -63,6 +76,23 @@ public class SimpleStompService {
list.add(registration);
}
@Override
protected void processSend(StompMessage message) {
logger.debug("Message received: " + message);
String destination = message.getHeaders().getDestination();
getReactor().notify("destination:" + destination, Event.wrap(message));
}
@Override
protected void processDisconnect(StompMessage message) {
removeSubscriptions(message.getStompSessionId());
}
@Override
protected void processConnectionClosed(String sessionId) {
removeSubscriptions(sessionId);
}
private void removeSubscriptions(String sessionId) {
List<Registration<?>> registrations = this.subscriptionsBySession.remove(sessionId);
if (logger.isTraceEnabled()) {
@ -73,54 +103,4 @@ public class SimpleStompService {
}
}
private final class SubscribeConsumer implements Consumer<Event<StompMessage>> {
@Override
public void accept(Event<StompMessage> event) {
StompMessage message = event.getData();
if (logger.isDebugEnabled()) {
logger.debug("Subscribe " + message);
}
Registration<?> registration = SimpleStompService.this.reactor.on(
Fn.$("destination:" + message.getHeaders().getDestination()),
new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
StompMessage inMessage = event.getData();
StompHeaders headers = new StompHeaders();
headers.setDestination(inMessage.getHeaders().getDestination());
StompMessage outMessage = new StompMessage(StompCommand.MESSAGE, headers, inMessage.getPayload());
SimpleStompService.this.reactor.notify(event.getReplyTo(), Fn.event(outMessage));
}
});
addSubscription(message.getStompSessionId(), registration);
}
}
private final class SendConsumer implements Consumer<Event<StompMessage>> {
@Override
public void accept(Event<StompMessage> event) {
StompMessage message = event.getData();
logger.debug("Message received: " + message);
String destination = message.getHeaders().getDestination();
SimpleStompService.this.reactor.notify("destination:" + destination, Fn.event(message));
}
}
private final class DisconnectConsumer implements Consumer<Event<String>> {
@Override
public void accept(Event<String> event) {
String sessionId = event.getData();
SimpleStompService.this.removeSubscriptions(sessionId);
}
}
}

View File

@ -77,10 +77,10 @@ public class DefaultStompWebSocketHandler extends AbstractStompWebSocketHandler
disconnect(session, message);
}
else if (StompCommand.ACK.equals(command) || StompCommand.NACK.equals(command)) {
this.reactor.notify(command, Fn.event(message));
this.reactor.notify(command, Event.wrap(message));
}
else if (StompCommand.BEGIN.equals(command) || StompCommand.COMMIT.equals(command) || StompCommand.ABORT.equals(command)) {
this.reactor.notify(command, Fn.event(message));
this.reactor.notify(command, Event.wrap(message));
}
else {
sendErrorMessage(session, "Invalid STOMP command " + command);
@ -96,7 +96,7 @@ public class DefaultStompWebSocketHandler extends AbstractStompWebSocketHandler
@Override
public void run() {
removeSubscriptions(session);
reactor.notify("CONNECTION_CLOSED", Fn.event(session.getId()));
reactor.notify("CONNECTION_CLOSED", Event.wrap(session.getId()));
}
});
}
@ -168,7 +168,7 @@ public class DefaultStompWebSocketHandler extends AbstractStompWebSocketHandler
addRegistration(session.getId(), registration);
this.reactor.notify(StompCommand.CONNECT, Fn.event(stompMessage, replyToKey));
this.reactor.notify(StompCommand.CONNECT, Event.wrap(stompMessage, replyToKey));
}
protected void subscribe(final StompSession session, StompMessage message) {
@ -198,7 +198,7 @@ public class DefaultStompWebSocketHandler extends AbstractStompWebSocketHandler
addRegistration(session.getId(), registration);
this.reactor.notify(StompCommand.SUBSCRIBE, Fn.event(message, replyToKey));
this.reactor.notify(StompCommand.SUBSCRIBE, Event.wrap(message, replyToKey));
// TODO: need a way to communicate back if subscription was successfully created or
// not in which case an ERROR should be sent back and close the connection
@ -220,7 +220,7 @@ public class DefaultStompWebSocketHandler extends AbstractStompWebSocketHandler
protected void unsubscribe(StompSession session, StompMessage message) {
cancelRegistration(session, message.getHeaders().getId());
this.reactor.notify(StompCommand.UNSUBSCRIBE, Fn.event(message));
this.reactor.notify(StompCommand.UNSUBSCRIBE, Event.wrap(message));
}
private void cancelRegistration(StompSession session, String subscriptionId) {
@ -238,12 +238,12 @@ public class DefaultStompWebSocketHandler extends AbstractStompWebSocketHandler
}
protected void send(StompSession session, StompMessage stompMessage) {
this.reactor.notify(StompCommand.SEND, Fn.event(stompMessage));
this.reactor.notify(StompCommand.SEND, Event.wrap(stompMessage));
}
protected void disconnect(StompSession session, StompMessage stompMessage) {
removeSubscriptions(session);
this.reactor.notify(StompCommand.DISCONNECT, Fn.event(stompMessage));
this.reactor.notify(StompCommand.DISCONNECT, Event.wrap(stompMessage));
}
private boolean removeSubscriptions(StompSession session) {