diff --git a/build.gradle b/build.gradle index 41d67e1ba47..68821626078 100644 --- a/build.gradle +++ b/build.gradle @@ -483,11 +483,13 @@ project("spring-websocket") { optional("org.eclipse.jetty.websocket:websocket-server:9.0.3.v20130506") optional("org.eclipse.jetty.websocket:websocket-client:9.0.3.v20130506") optional("com.fasterxml.jackson.core:jackson-databind:2.2.0") // required for SockJS support currently + optional("reactor:reactor-core:1.0.0.BUILD-SNAPSHOT") } repositories { maven { url "https://repository.apache.org/content/repositories/snapshots" } // tomcat-websocket-* snapshots maven { url "https://maven.java.net/content/repositories/releases" } // javax.websocket, tyrus + maven { url 'http://repo.springsource.org/libs-snapshot' } // reactor } } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/BinaryMessage.java b/spring-websocket/src/main/java/org/springframework/web/socket/BinaryMessage.java index 3f779a0147d..b9eba5a1cb8 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/BinaryMessage.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/BinaryMessage.java @@ -101,4 +101,9 @@ public final class BinaryMessage extends WebSocketMessage { return (getPayload() != null) ? getPayload().remaining() : 0; } + @Override + protected String toStringPayload() { + return (getPayload() != null) ? getPayload().toString() : null; + } + } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/TextMessage.java b/spring-websocket/src/main/java/org/springframework/web/socket/TextMessage.java index 5c078988ac0..ca1cbf21b80 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/TextMessage.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/TextMessage.java @@ -49,4 +49,9 @@ public final class TextMessage extends WebSocketMessage { return getPayload().length(); } + @Override + protected String toStringPayload() { + return (getPayloadSize() > 25) ? getPayload().substring(0, 25) + "..." : getPayload(); + } + } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/WebSocketMessage.java b/spring-websocket/src/main/java/org/springframework/web/socket/WebSocketMessage.java index 23a32efcae3..bc7f00a46b9 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/WebSocketMessage.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/WebSocketMessage.java @@ -79,9 +79,12 @@ public abstract class WebSocketMessage { @Override public String toString() { - return getClass().getSimpleName() + " [payload length=" + getPayloadSize() + ", last=" + isLast() + "]"; + return getClass().getSimpleName() + " payload= " + toStringPayload() + + ", length=" + getPayloadSize() + ", last=" + isLast() + "]"; } + protected abstract String toStringPayload(); + protected abstract int getPayloadSize(); } diff --git a/spring-websocket/src/main/java/org/springframework/web/stomp/StompCommand.java b/spring-websocket/src/main/java/org/springframework/web/stomp/StompCommand.java new file mode 100644 index 00000000000..752933b0bd3 --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/stomp/StompCommand.java @@ -0,0 +1,46 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.stomp; + + +/** + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public enum StompCommand { + + // client + CONNECT, + STOMP, + SEND, + SUBSCRIBE, + UNSUBSCRIBE, + ACK, + NACK, + BEGIN, + COMMIT, + ABORT, + DISCONNECT, + + // server + CONNECTED, + MESSAGE, + RECEIPT, + ERROR + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/stomp/StompException.java b/spring-websocket/src/main/java/org/springframework/web/stomp/StompException.java new file mode 100644 index 00000000000..08ba53b6b02 --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/stomp/StompException.java @@ -0,0 +1,37 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.stomp; + +import org.springframework.core.NestedRuntimeException; + +/** + * @author Gary Russell + * @since 4.0 + * + */ +@SuppressWarnings("serial") +public class StompException extends NestedRuntimeException { + + + public StompException(String msg, Throwable cause) { + super(msg, cause); + } + + public StompException(String msg) { + super(msg); + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/stomp/StompHeaders.java b/spring-websocket/src/main/java/org/springframework/web/stomp/StompHeaders.java new file mode 100644 index 00000000000..38f07c5f9e0 --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/stomp/StompHeaders.java @@ -0,0 +1,298 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.stomp; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.springframework.util.Assert; +import org.springframework.util.MultiValueMap; +import org.springframework.util.StringUtils; + + +/** + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class StompHeaders implements MultiValueMap, Serializable { + + private static final long serialVersionUID = 1L; + + // Client + private static final String ACCEPT_VERSION = "accept-version"; + + private static final String ID = "id"; + + private static final String HOST = "host"; + + // Server + + private static final String MESSAGE_ID = "message-id"; + + private static final String RECEIPT_ID = "receipt-id"; + + private static final String SUBSCRIPTION = "subscription"; + + private static final String VERSION = "version"; + + // Client and Server + + private static final String ACK = "ack"; + + private static final String DESTINATION = "destination"; + + private static final String HEARTBEAT = "heart-beat"; + + + private final Map> headers; + + + /** + * Private constructor that can create read-only {@code StompHeaders} instances. + */ + private StompHeaders(Map> headers, boolean readOnly) { + Assert.notNull(headers, "'headers' must not be null"); + if (readOnly) { + Map> map = new LinkedHashMap>(headers.size()); + for (Entry> entry : headers.entrySet()) { + List values = Collections.unmodifiableList(entry.getValue()); + map.put(entry.getKey(), values); + } + this.headers = Collections.unmodifiableMap(map); + } + else { + this.headers = headers; + } + } + + /** + * Constructs a new, empty instance of the {@code StompHeaders} object. + */ + public StompHeaders() { + this(new LinkedHashMap>(4), false); + } + + /** + * Returns {@code StompHeaders} object that can only be read, not written to. + */ + public static StompHeaders readOnlyStompHeaders(StompHeaders headers) { + return new StompHeaders(headers, true); + } + + public Set getAcceptVersion() { + String rawValue = getFirst(ACCEPT_VERSION); + return (rawValue != null) ? StringUtils.commaDelimitedListToSet(rawValue) : Collections.emptySet(); + } + + public void setAcceptVersion(String acceptVersion) { + set(ACCEPT_VERSION, acceptVersion); + } + + public String getVersion() { + return getFirst(VERSION); + } + + public void setVersion(String version) { + set(VERSION, version); + } + + public String getDestination() { + return getFirst(DESTINATION); + } + + public void setDestination(String destination) { + set(DESTINATION, destination); + } + + public long[] getHeartbeat() { + String rawValue = getFirst(HEARTBEAT); + if (!StringUtils.hasText(rawValue)) { + return null; + } + String[] rawValues = StringUtils.commaDelimitedListToStringArray(rawValue); + // TODO assertions + return new long[] { Long.valueOf(rawValues[0]), Long.valueOf(rawValues[1])}; + } + + public void setHeartbeat(long cx, long cy) { + set(HEARTBEAT, StringUtils.arrayToCommaDelimitedString(new Object[] {cx, cy})); + } + + public String getId() { + return getFirst(ID); + } + + public void setId(String id) { + set(ID, id); + } + + public String getMessageId() { + return getFirst(MESSAGE_ID); + } + + public void setMessageId(String id) { + set(MESSAGE_ID, id); + } + + public String getSubscription() { + return getFirst(SUBSCRIPTION); + } + + public void setSubscription(String id) { + set(SUBSCRIPTION, id); + } + + + // MultiValueMap methods + + /** + * Return the first header value for the given header name, if any. + * @param headerName the header name + * @return the first header value; or {@code null} + */ + public String getFirst(String headerName) { + List headerValues = headers.get(headerName); + return headerValues != null ? headerValues.get(0) : null; + } + + /** + * Add the given, single header value under the given name. + * @param headerName the header name + * @param headerValue the header value + * @throws UnsupportedOperationException if adding headers is not supported + * @see #put(String, List) + * @see #set(String, String) + */ + public void add(String headerName, String headerValue) { + List headerValues = headers.get(headerName); + if (headerValues == null) { + headerValues = new LinkedList(); + this.headers.put(headerName, headerValues); + } + headerValues.add(headerValue); + } + + /** + * Set the given, single header value under the given name. + * @param headerName the header name + * @param headerValue the header value + * @throws UnsupportedOperationException if adding headers is not supported + * @see #put(String, List) + * @see #add(String, String) + */ + public void set(String headerName, String headerValue) { + List headerValues = new LinkedList(); + headerValues.add(headerValue); + headers.put(headerName, headerValues); + } + + public void setAll(Map values) { + for (Entry entry : values.entrySet()) { + set(entry.getKey(), entry.getValue()); + } + } + + public Map toSingleValueMap() { + LinkedHashMap singleValueMap = new LinkedHashMap(this.headers.size()); + for (Entry> entry : headers.entrySet()) { + singleValueMap.put(entry.getKey(), entry.getValue().get(0)); + } + return singleValueMap; + } + + + // Map implementation + + public int size() { + return this.headers.size(); + } + + public boolean isEmpty() { + return this.headers.isEmpty(); + } + + public boolean containsKey(Object key) { + return this.headers.containsKey(key); + } + + public boolean containsValue(Object value) { + return this.headers.containsValue(value); + } + + public List get(Object key) { + return this.headers.get(key); + } + + public List put(String key, List value) { + return this.headers.put(key, value); + } + + public List remove(Object key) { + return this.headers.remove(key); + } + + public void putAll(Map> m) { + this.headers.putAll(m); + } + + public void clear() { + this.headers.clear(); + } + + public Set keySet() { + return this.headers.keySet(); + } + + public Collection> values() { + return this.headers.values(); + } + + public Set>> entrySet() { + return this.headers.entrySet(); + } + + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof StompHeaders)) { + return false; + } + StompHeaders otherHeaders = (StompHeaders) other; + return this.headers.equals(otherHeaders.headers); + } + + @Override + public int hashCode() { + return this.headers.hashCode(); + } + + @Override + public String toString() { + return this.headers.toString(); + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/stomp/StompMessage.java b/spring-websocket/src/main/java/org/springframework/web/stomp/StompMessage.java new file mode 100644 index 00000000000..c23067744ba --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/stomp/StompMessage.java @@ -0,0 +1,68 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.stomp; + +import java.nio.charset.Charset; + + +/** + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class StompMessage { + + public static final Charset CHARSET = Charset.forName("UTF-8"); + + private final StompCommand command; + + private final StompHeaders headers; + + private final byte[] payload; + + + public StompMessage(StompCommand command, StompHeaders headers, byte[] payload) { + this.command = command; + this.headers = (headers != null) ? headers : new StompHeaders(); + this.payload = payload; + } + + /** + * Constructor for empty payload message. + */ + public StompMessage(StompCommand command, StompHeaders headers) { + this(command, headers, new byte[0]); + } + + public StompCommand getCommand() { + return this.command; + } + + public StompHeaders getHeaders() { + return this.headers; + } + + public byte[] getPayload() { + return this.payload; + } + + @Override + public String toString() { + return "StompMessage [headers=" + this.headers + ", payload=" + new String(this.payload) + "]"; + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/stomp/StompSession.java b/spring-websocket/src/main/java/org/springframework/web/stomp/StompSession.java new file mode 100644 index 00000000000..ee2b635889c --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/stomp/StompSession.java @@ -0,0 +1,34 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.stomp; + +import java.io.IOException; + + +/** + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public interface StompSession { + + String getId(); + + void sendMessage(StompMessage message) throws IOException; + + void close() throws Exception; + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/stomp/adapter/StompMessageProcessor.java b/spring-websocket/src/main/java/org/springframework/web/stomp/adapter/StompMessageProcessor.java new file mode 100644 index 00000000000..45799a87b58 --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/stomp/adapter/StompMessageProcessor.java @@ -0,0 +1,33 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.stomp.adapter; + +import java.io.IOException; + +import org.springframework.web.stomp.StompMessage; +import org.springframework.web.stomp.StompSession; + + +/** + * @author Rossen Stoyanchev + * @since 4.0 + */ +public interface StompMessageProcessor { + + void processMessage(StompSession stompSession, StompMessage message) throws IOException; + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/stomp/adapter/StompWebSocketHandler.java b/spring-websocket/src/main/java/org/springframework/web/stomp/adapter/StompWebSocketHandler.java new file mode 100644 index 00000000000..019d9c03c48 --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/stomp/adapter/StompWebSocketHandler.java @@ -0,0 +1,71 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.stomp.adapter; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.springframework.util.Assert; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.adapter.TextWebSocketHandlerAdapter; +import org.springframework.web.stomp.StompMessage; +import org.springframework.web.stomp.StompSession; +import org.springframework.web.stomp.support.StompMessageConverter; + + +/** + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class StompWebSocketHandler extends TextWebSocketHandlerAdapter { + + private final StompMessageProcessor messageProcessor; + + private final StompMessageConverter messageConverter = new StompMessageConverter(); + + private final Map sessions = new ConcurrentHashMap(); + + + public StompWebSocketHandler(StompMessageProcessor messageProcessor) { + this.messageProcessor = messageProcessor; + } + + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + WebSocketStompSession stompSession = new WebSocketStompSession(session, this.messageConverter); + this.sessions.put(session.getId(), stompSession); + } + + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { + + StompSession stompSession = this.sessions.get(session.getId()); + Assert.notNull(stompSession, "No STOMP session for WebSocket session id=" + session.getId()); + + StompMessage stompMessage = this.messageConverter.toStompMessage(message.getPayload()); + this.messageProcessor.processMessage(stompSession, stompMessage); + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { + this.sessions.remove(session.getId()); + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/stomp/adapter/WebSocketStompSession.java b/spring-websocket/src/main/java/org/springframework/web/stomp/adapter/WebSocketStompSession.java new file mode 100644 index 00000000000..3e34752bc60 --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/stomp/adapter/WebSocketStompSession.java @@ -0,0 +1,71 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.stomp.adapter; + +import java.io.IOException; + +import org.springframework.util.Assert; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.stomp.StompMessage; +import org.springframework.web.stomp.StompSession; +import org.springframework.web.stomp.support.StompMessageConverter; + + +/** + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class WebSocketStompSession implements StompSession { + + private final String id; + + private WebSocketSession webSocketSession; + + private final StompMessageConverter messageConverter; + + + public WebSocketStompSession(WebSocketSession webSocketSession, StompMessageConverter messageConverter) { + Assert.notNull(webSocketSession, "webSocketSession is required"); + this.id = webSocketSession.getId(); + this.webSocketSession = webSocketSession; + this.messageConverter = messageConverter; + } + + @Override + public String getId() { + return this.id; + } + + @Override + public void sendMessage(StompMessage message) throws IOException { + Assert.notNull(this.webSocketSession, "Cannot send message without active session"); + byte[] bytes = this.messageConverter.fromStompMessage(message); + this.webSocketSession.sendMessage(new TextMessage(new String(bytes, StompMessage.CHARSET))); + } + + public void sessionClosed() { + this.webSocketSession = null; + } + + @Override + public void close() throws Exception { + this.webSocketSession.close(); + this.webSocketSession = null; + } + +} \ No newline at end of file diff --git a/spring-websocket/src/main/java/org/springframework/web/stomp/server/ReactorServerStompMessageProcessor.java b/spring-websocket/src/main/java/org/springframework/web/stomp/server/ReactorServerStompMessageProcessor.java new file mode 100644 index 00000000000..027bf8e1ee0 --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/stomp/server/ReactorServerStompMessageProcessor.java @@ -0,0 +1,175 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.stomp.server; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.util.Assert; +import org.springframework.web.stomp.StompCommand; +import org.springframework.web.stomp.StompException; +import org.springframework.web.stomp.StompHeaders; +import org.springframework.web.stomp.StompMessage; +import org.springframework.web.stomp.StompSession; +import org.springframework.web.stomp.adapter.StompMessageProcessor; + +import reactor.Fn; +import reactor.core.Reactor; +import reactor.fn.Consumer; +import reactor.fn.Event; +import reactor.fn.Registration; +import reactor.fn.Tuple; + +/** + * @author Gary Russell + * @author Rossen Stoyanchev + * @since 4.0 + * + */ +public class ReactorServerStompMessageProcessor implements StompMessageProcessor { + + private static Log logger = LogFactory.getLog(ReactorServerStompMessageProcessor.class); + + + private final Reactor reactor; + + private Map>> subscriptionsBySession = new ConcurrentHashMap>>(); + + + public ReactorServerStompMessageProcessor(Reactor reactor) { + this.reactor = reactor; + } + + public void processMessage(StompSession session, StompMessage message) throws IOException { + + StompCommand command = message.getCommand(); + Assert.notNull(command, "STOMP command not found"); + + if (StompCommand.CONNECT.equals(command) || StompCommand.STOMP.equals(command)) { + connect(session, message); + } + else if (StompCommand.SUBSCRIBE.equals(command)) { + subscribe(session, message); + } + else if (StompCommand.UNSUBSCRIBE.equals(command)) { + unsubscribe(session, message); + } + else if (StompCommand.SEND.equals(command)) { + send(session, message); + } + else if (StompCommand.DISCONNECT.equals(command)) { + disconnect(session); + } + else { + throw new IllegalStateException("Unexpected command: " + command); + } + } + + protected void connect(StompSession session, StompMessage connectMessage) throws IOException { + + StompHeaders headers = new StompHeaders(); + Set acceptVersions = connectMessage.getHeaders().getAcceptVersion(); + if (acceptVersions.contains("1.2")) { + headers.setVersion("1.2"); + } + else if (acceptVersions.contains("1.1")) { + headers.setVersion("1.1"); + } + else if (acceptVersions.isEmpty()) { + // 1.0 + } + else { + throw new StompException("Unsupported version '" + acceptVersions + "'"); + } + headers.setHeartbeat(0,0); // TODO + headers.setId(session.getId()); + + // TODO: security + + this.reactor.notify(StompCommand.CONNECT, Fn.event(session.getId())); + + session.sendMessage(new StompMessage(StompCommand.CONNECTED, headers)); + } + + protected void subscribe(final StompSession session, StompMessage message) { + + final String subscription = message.getHeaders().getId(); + String replyToKey = StompCommand.SUBSCRIBE + ":" + session.getId() + ":" + subscription; + + if (logger.isTraceEnabled()) { + logger.trace("Adding subscription with replyToKey=" + replyToKey); + } + + Registration registration = this.reactor.on(Fn.$(replyToKey), new Consumer>() { + @Override + public void accept(Event event) { + event.getData().getHeaders().setSubscription(subscription); + try { + session.sendMessage(event.getData()); + } + catch (IOException e) { + // TODO: stomp error, close session, websocket close status + ReactorServerStompMessageProcessor.this.removeSubscriptions(session.getId()); + e.printStackTrace(); + } + } + }); + + addSubscription(session.getId(), registration); + + this.reactor.notify(StompCommand.SUBSCRIBE, Fn.event(Tuple.of(session.getId(), message), replyToKey)); + } + + private void addSubscription(String sessionId, Registration registration) { + List> list = this.subscriptionsBySession.get(sessionId); + if (list == null) { + list = new ArrayList>(); + this.subscriptionsBySession.put(sessionId, list); + } + list.add(registration); + } + + protected void unsubscribe(StompSession session, StompMessage message) { + this.reactor.notify(StompCommand.UNSUBSCRIBE, Fn.event(Tuple.of(session.getId(), message))); + } + + protected void send(StompSession session, StompMessage message) { + this.reactor.notify(StompCommand.SEND, Fn.event(Tuple.of(session.getId(), message))); + } + + protected void disconnect(StompSession session) { + String sessionId = session.getId(); + removeSubscriptions(sessionId); + this.reactor.notify(StompCommand.DISCONNECT, Fn.event(sessionId)); + } + + private void removeSubscriptions(String sessionId) { + List> registrations = this.subscriptionsBySession.remove(sessionId); + if (logger.isTraceEnabled()) { + logger.trace("Cancelling " + registrations.size() + " subscriptions for session=" + sessionId); + } + for (Registration registration : registrations) { + registration.cancel(); + } + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/stomp/server/SimpleStompReactorService.java b/spring-websocket/src/main/java/org/springframework/web/stomp/server/SimpleStompReactorService.java new file mode 100644 index 00000000000..9d22507769e --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/stomp/server/SimpleStompReactorService.java @@ -0,0 +1,129 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.stomp.server; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.web.stomp.StompCommand; +import org.springframework.web.stomp.StompHeaders; +import org.springframework.web.stomp.StompMessage; + +import reactor.Fn; +import reactor.core.Reactor; +import reactor.fn.Consumer; +import reactor.fn.Event; +import reactor.fn.Registration; +import reactor.fn.Tuple2; + + +/** + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class SimpleStompReactorService { + + private static final Log logger = LogFactory.getLog(SimpleStompReactorService.class); + + private final Reactor reactor; + + private Map>> subscriptionsBySession = new ConcurrentHashMap>>(); + + + public SimpleStompReactorService(Reactor reactor) { + this.reactor = reactor; + this.reactor.on(Fn.$(StompCommand.SUBSCRIBE), new SubscribeConsumer()); + this.reactor.on(Fn.$(StompCommand.SEND), new SendConsumer()); + this.reactor.on(Fn.$(StompCommand.DISCONNECT), new DisconnectConsumer()); + } + + private void addSubscription(String sessionId, Registration registration) { + List> list = this.subscriptionsBySession.get(sessionId); + if (list == null) { + list = new ArrayList>(); + this.subscriptionsBySession.put(sessionId, list); + } + list.add(registration); + } + + private void removeSubscriptions(String sessionId) { + List> registrations = this.subscriptionsBySession.remove(sessionId); + if (logger.isTraceEnabled()) { + logger.trace("Cancelling " + registrations.size() + " subscriptions for session=" + sessionId); + } + for (Registration registration : registrations) { + registration.cancel(); + } + } + + + private final class SubscribeConsumer implements Consumer>> { + + @Override + public void accept(Event> event) { + + String sessionId = event.getData().getT1(); + StompMessage message = event.getData().getT2(); + final Object replyToKey = event.getReplyTo(); + + if (logger.isDebugEnabled()) { + logger.debug("Subscribe " + message); + } + + Registration registration = SimpleStompReactorService.this.reactor.on( + Fn.$("destination:" + message.getHeaders().getDestination()), + new Consumer>() { + @Override + public void accept(Event event) { + StompMessage inMessage = event.getData(); + StompHeaders headers = new StompHeaders(); + headers.setDestination(inMessage.getHeaders().getDestination()); + StompMessage outMessage = new StompMessage(StompCommand.MESSAGE, headers, inMessage.getPayload()); + SimpleStompReactorService.this.reactor.notify(replyToKey, Fn.event(outMessage)); + } + }); + + addSubscription(sessionId, registration); + } + } + + private final class SendConsumer implements Consumer>> { + + @Override + public void accept(Event> event) { + StompMessage message = event.getData().getT2(); + logger.debug("Message received: " + message); + + String destination = message.getHeaders().getDestination(); + SimpleStompReactorService.this.reactor.notify("destination:" + destination, Fn.event(message)); + } + } + + private final class DisconnectConsumer implements Consumer> { + + @Override + public void accept(Event event) { + String sessionId = event.getData(); + SimpleStompReactorService.this.removeSubscriptions(sessionId); + } + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/stomp/support/StompMessageConverter.java b/spring-websocket/src/main/java/org/springframework/web/stomp/support/StompMessageConverter.java new file mode 100644 index 00000000000..edccc5cb26a --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/stomp/support/StompMessageConverter.java @@ -0,0 +1,207 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.stomp.support; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.Map.Entry; + +import org.springframework.util.Assert; +import org.springframework.web.stomp.StompCommand; +import org.springframework.web.stomp.StompException; +import org.springframework.web.stomp.StompHeaders; +import org.springframework.web.stomp.StompMessage; + +/** + * @author Gary Russell + * @since 4.0 + * + */ +public class StompMessageConverter { + + public static final byte LF = 0x0a; + + public static final byte CR = 0x0d; + + private static final byte COLON = ':'; + + /** + * @param bytes a complete STOMP message (without the trailing 0x00). + */ + public StompMessage toStompMessage(Object stomp) { + Assert.state(stomp instanceof String || stomp instanceof byte[], "'stomp' must be String or byte[]"); + byte[] stompBytes = null; + if (stomp instanceof String) { + stompBytes = ((String) stomp).getBytes(StompMessage.CHARSET); + } + else { + stompBytes = (byte[]) stomp; + } + int totalLength = stompBytes.length; + if (stompBytes[totalLength-1] == 0) { + totalLength--; + } + int payloadIndex = findPayloadStart(stompBytes); + if (payloadIndex == 0) { + throw new StompException("No command found"); + } + String headerString = new String(stompBytes, 0, payloadIndex, StompMessage.CHARSET); + Parser parser = new Parser(headerString); + StompHeaders headers = new StompHeaders(); + // TODO: validate command and whether a payload is allowed + StompCommand command = StompCommand.valueOf(parser.nextToken(LF).trim()); + Assert.notNull(command, "No command found"); + while (parser.hasNext()) { + String header = parser.nextToken(COLON); + if (header != null) { + if (parser.hasNext()) { + String value = parser.nextToken(LF); + headers.add(header, value); + } + else { + throw new StompException("Parse exception for " + headerString); + } + } + } + byte[] payload = new byte[totalLength - payloadIndex]; + System.arraycopy(stompBytes, payloadIndex, payload, 0, totalLength - payloadIndex); + return new StompMessage(command, headers, payload); + } + + public byte[] fromStompMessage(StompMessage message) { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + StompHeaders headers = message.getHeaders(); + StompCommand command = message.getCommand(); + try { + outputStream.write(command.toString().getBytes("UTF-8")); + outputStream.write(LF); + for (Entry> entry : headers.entrySet()) { + String key = entry.getKey(); + key = replaceAllOutbound(key); + for (String value : entry.getValue()) { + outputStream.write(key.getBytes("UTF-8")); + outputStream.write(COLON); + value = replaceAllOutbound(value); + outputStream.write(value.getBytes("UTF-8")); + outputStream.write(LF); + } + } + outputStream.write(LF); + outputStream.write(message.getPayload()); + outputStream.write(0); + return outputStream.toByteArray(); + } + catch (IOException e) { + throw new StompException("Failed to serialize " + message, e); + } + } + + private String replaceAllOutbound(String key) { + return key.replaceAll("\\\\", "\\\\") + .replaceAll(":", "\\\\c") + .replaceAll("\n", "\\\\n") + .replaceAll("\r", "\\\\r"); + } + + private int findPayloadStart(byte[] bytes) { + int i; + // ignore any leading EOL from the previous message + for (i = 0; i < bytes.length; i++) { + if (bytes[i] != '\n' && bytes[i] != '\r' ) { + break; + } + bytes[i] = ' '; + } + int payloadOffset = 0; + for (; i < bytes.length - 1; i++) { + if ((bytes[i] == LF && bytes[i+1] == LF)) { + payloadOffset = i + 2; + break; + } + if (i < bytes.length - 3 && + (bytes[i] == CR && bytes[i+1] == LF && + bytes[i+2] == CR && bytes[i+3] == LF)) { + payloadOffset = i + 4; + break; + } + } + if (i >= bytes.length) { + throw new StompException("No end of headers found"); + } + return payloadOffset; + } + + private class Parser { + + private final String content; + + private int offset; + + public Parser(String content) { + this.content = content; + } + + public boolean hasNext() { + return this.offset < this.content.length(); + } + + public String nextToken(byte delimiter) { + if (this.offset >= this.content.length()) { + return null; + } + int delimAt = this.content.indexOf(delimiter, this.offset); + if (delimAt == -1) { + if (this.offset == this.content.length() - 1 && delimiter == COLON && + this.content.charAt(this.offset) == LF) { + this.offset++; + return null; + } + else if (this.offset == this.content.length() - 2 && delimiter == COLON && + this.content.charAt(this.offset) == CR && + this.content.charAt(this.offset + 1) == LF) { + this.offset += 2; + return null; + } + else { + throw new StompException("No delimiter found at offset " + offset + " in " + this.content); + } + } + int escapeAt = this.content.indexOf('\\', this.offset); + String token = this.content.substring(this.offset, delimAt + 1); + this.offset += token.length(); + if (escapeAt >= 0 && escapeAt < delimAt) { + char escaped = this.content.charAt(escapeAt + 1); + if (escaped == 'n' || escaped == 'c' || escaped == '\\') { + token = token.replaceAll("\\\\n", "\n") + .replaceAll("\\\\r", "\r") + .replaceAll("\\\\c", ":") + .replaceAll("\\\\\\\\", "\\\\"); + } + else { + throw new StompException("Invalid escape sequence \\" + escaped); + } + } + int length = token.length(); + if (delimiter == LF && length > 1 && token.charAt(length - 2) == CR) { + return token.substring(0, length - 2); + } + else { + return token.substring(0, length - 1); + } + } + } +}