Add Message, MessageChannel and refactor stomp support

This commit is contained in:
Rossen Stoyanchev 2013-06-09 19:36:46 -04:00
parent 8913283ce0
commit de899820c9
51 changed files with 2303 additions and 1292 deletions

View File

@ -484,7 +484,6 @@ project("spring-websocket") {
optional("org.eclipse.jetty.websocket:websocket-client:9.0.3.v20130506")
optional("com.fasterxml.jackson.core:jackson-databind:2.2.0") // required for SockJS support currently
optional("reactor:reactor-core:1.0.0.BUILD-SNAPSHOT")
optional("reactor:reactor-tcp:1.0.0.BUILD-SNAPSHOT")
}
repositories {

View File

@ -0,0 +1,103 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
/**
* Base Message class defining common properties such as id, payload, and headers.
* Once created this object is immutable.
*
* @author Mark Fisher
* @since 4.0
*/
public class GenericMessage<T> implements Message<T>, Serializable {
private static final long serialVersionUID = -9004496725833093406L;
private final T payload;
private final MessageHeaders headers;
/**
* Create a new message with the given payload.
*
* @param payload the message payload
*/
public GenericMessage(T payload) {
this(payload, null);
}
/**
* Create a new message with the given payload. The provided map
* will be used to populate the message headers
*
* @param payload the message payload
* @param headers message headers
* @see MessageHeaders
*/
public GenericMessage(T payload, Map<String, Object> headers) {
Assert.notNull(payload, "payload must not be null");
if (headers == null) {
headers = new HashMap<String, Object>();
}
else {
headers = new HashMap<String, Object>(headers);
}
this.headers = new MessageHeaders(headers);
this.payload = payload;
}
public MessageHeaders getHeaders() {
return this.headers;
}
public T getPayload() {
return this.payload;
}
public String toString() {
return "[Payload=" + this.payload + "][Headers=" + this.headers + "]";
}
public int hashCode() {
return this.headers.hashCode() * 23 + ObjectUtils.nullSafeHashCode(this.payload);
}
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj != null && obj instanceof GenericMessage<?>) {
GenericMessage<?> other = (GenericMessage<?>) obj;
if (!this.headers.getId().equals(other.headers.getId())) {
return false;
}
return this.headers.equals(other.headers)
&& this.payload.equals(other.payload);
}
return false;
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging;
/**
* A generic message representation with headers and body.
*
* @author Mark Fisher
* @author Arjen Poutsma
* @since 4.0
*/
public interface Message<T> {
MessageHeaders getHeaders();
T getPayload();
}

View File

@ -0,0 +1,42 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging;
/**
* Base channel interface defining common behavior for sending messages.
*
* @author Mark Fisher
* @since 4.0
*/
public interface MessageChannel {
/**
* Send a {@link Message} to this channel. May throw a RuntimeException for
* non-recoverable errors. Otherwise, if the Message cannot be sent for a
* non-fatal reason this method will return 'false', and if the Message is
* sent successfully, it will return 'true'.
*
* <p>Depending on the implementation, this method may block indefinitely.
* To provide a maximum wait time, use {@link #send(Message, long)}.
*
* @param message the {@link Message} to send
*
* @return whether or not the Message has been sent successfully
*/
boolean send(Message<?> message);
}

View File

@ -0,0 +1,248 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* The headers for a {@link Message}.<br>
* IMPORTANT: MessageHeaders are immutable. Any mutating operation (e.g., put(..), putAll(..) etc.)
* will result in {@link UnsupportedOperationException}
* <p>
* TODO: update javadoc
*
* <p>To create MessageHeaders instance use fluent MessageBuilder API
* <pre>
* MessageBuilder.withPayload("foo").setHeader("key1", "value1").setHeader("key2", "value2");
* </pre>
* or create an instance of GenericMessage passing payload as {@link Object} and headers as a regular {@link Map}
* <pre>
* Map headers = new HashMap();
* headers.put("key1", "value1");
* headers.put("key2", "value2");
* new GenericMessage("foo", headers);
* </pre>
*
* @author Arjen Poutsma
* @author Mark Fisher
* @author Oleg Zhurakousky
* @author Gary Russell
* @since 4.0
*/
public final class MessageHeaders implements Map<String, Object>, Serializable {
private static final long serialVersionUID = 8946067357652612145L;
private static final Log logger = LogFactory.getLog(MessageHeaders.class);
private static volatile IdGenerator idGenerator = null;
/**
* The key for the Message ID. This is an automatically generated UUID and
* should never be explicitly set in the header map <b>except</b> in the
* case of Message deserialization where the serialized Message's generated
* UUID is being restored.
*/
public static final String ID = "id";
public static final String TIMESTAMP = "timestamp";
public static final String REPLY_CHANNEL = "replyChannel";
public static final String ERROR_CHANNEL = "errorChannel";
public static final String CONTENT_TYPE = "content-type";
// DESTINATION ?
public static final List<String> HEADER_NAMES =
Arrays.asList(ID, TIMESTAMP, REPLY_CHANNEL, ERROR_CHANNEL, CONTENT_TYPE);
private final Map<String, Object> headers;
public MessageHeaders(Map<String, Object> headers) {
this.headers = (headers != null) ? new HashMap<String, Object>(headers) : new HashMap<String, Object>();
if (MessageHeaders.idGenerator == null){
this.headers.put(ID, UUID.randomUUID());
}
else {
this.headers.put(ID, MessageHeaders.idGenerator.generateId());
}
this.headers.put(TIMESTAMP, new Long(System.currentTimeMillis()));
}
public UUID getId() {
return this.get(ID, UUID.class);
}
public Long getTimestamp() {
return this.get(TIMESTAMP, Long.class);
}
public Object getReplyChannel() {
return this.get(REPLY_CHANNEL);
}
public Object getErrorChannel() {
return this.get(ERROR_CHANNEL);
}
@SuppressWarnings("unchecked")
public <T> T get(Object key, Class<T> type) {
Object value = this.headers.get(key);
if (value == null) {
return null;
}
if (!type.isAssignableFrom(value.getClass())) {
throw new IllegalArgumentException("Incorrect type specified for header '" + key + "'. Expected [" + type
+ "] but actual type is [" + value.getClass() + "]");
}
return (T) value;
}
@Override
public int hashCode() {
return this.headers.hashCode();
}
@Override
public boolean equals(Object object) {
if (this == object) {
return true;
}
if (object != null && object instanceof MessageHeaders) {
MessageHeaders other = (MessageHeaders) object;
return this.headers.equals(other.headers);
}
return false;
}
@Override
public String toString() {
return this.headers.toString();
}
/*
* Map implementation
*/
public boolean containsKey(Object key) {
return this.headers.containsKey(key);
}
public boolean containsValue(Object value) {
return this.headers.containsValue(value);
}
public Set<Map.Entry<String, Object>> entrySet() {
return Collections.unmodifiableSet(this.headers.entrySet());
}
public Object get(Object key) {
return this.headers.get(key);
}
public boolean isEmpty() {
return this.headers.isEmpty();
}
public Set<String> keySet() {
return Collections.unmodifiableSet(this.headers.keySet());
}
public int size() {
return this.headers.size();
}
public Collection<Object> values() {
return Collections.unmodifiableCollection(this.headers.values());
}
// Unsupported operations
/**
* Since MessageHeaders are immutable the call to this method will result in {@link UnsupportedOperationException}
*/
public Object put(String key, Object value) {
throw new UnsupportedOperationException("MessageHeaders is immutable.");
}
/**
* Since MessageHeaders are immutable the call to this method will result in {@link UnsupportedOperationException}
*/
public void putAll(Map<? extends String, ? extends Object> t) {
throw new UnsupportedOperationException("MessageHeaders is immutable.");
}
/**
* Since MessageHeaders are immutable the call to this method will result in {@link UnsupportedOperationException}
*/
public Object remove(Object key) {
throw new UnsupportedOperationException("MessageHeaders is immutable.");
}
/**
* Since MessageHeaders are immutable the call to this method will result in {@link UnsupportedOperationException}
*/
public void clear() {
throw new UnsupportedOperationException("MessageHeaders is immutable.");
}
// Serialization methods
private void writeObject(ObjectOutputStream out) throws IOException {
List<String> keysToRemove = new ArrayList<String>();
for (Map.Entry<String, Object> entry : this.headers.entrySet()) {
if (!(entry.getValue() instanceof Serializable)) {
keysToRemove.add(entry.getKey());
}
}
for (String key : keysToRemove) {
if (logger.isInfoEnabled()) {
logger.info("removing non-serializable header: " + key);
}
this.headers.remove(key);
}
out.defaultWriteObject();
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
}
public static interface IdGenerator {
UUID generateId();
}
}

View File

@ -0,0 +1,70 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging;
/**
* The base exception for any failures related to messaging.
*
* @author Mark Fisher
* @author Gary Russell
* @since 4.0
*/
@SuppressWarnings("serial")
public class MessagingException extends RuntimeException {
private volatile Message<?> failedMessage;
public MessagingException(Message<?> message) {
super();
this.failedMessage = message;
}
public MessagingException(String description) {
super(description);
this.failedMessage = null;
}
public MessagingException(String description, Throwable cause) {
super(description, cause);
this.failedMessage = null;
}
public MessagingException(Message<?> message, String description) {
super(description);
this.failedMessage = message;
}
public MessagingException(Message<?> message, Throwable cause) {
super(cause);
this.failedMessage = message;
}
public MessagingException(Message<?> message, String description, Throwable cause) {
super(description, cause);
this.failedMessage = message;
}
public Message<?> getFailedMessage() {
return this.failedMessage;
}
public void setFailedMessage(Message<?> message) {
this.failedMessage = message;
}
}

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.web.messaging;
package org.springframework.messaging.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
@ -33,14 +33,8 @@ import java.lang.annotation.Target;
public @interface MessageMapping {
/**
* The primary mapping expressed by this annotation.
* <p>The destination for a message (e.g. "/topic/echo").
* Destination values for the message.
*/
String[] value() default {};
/**
* TODO
*/
MessageType messageType() default MessageType.NONE;
}

View File

@ -0,0 +1,4 @@
/**
* Generic support for working with messaging APIs and protocols.
*/
package org.springframework.messaging;

View File

@ -23,6 +23,16 @@ package org.springframework.web.messaging;
*/
public enum MessageType {
CONNECT, SUBSCRIBE, UNSUBSCRIBE, SEND, OTHER, NONE
CONNECT,
MESSAGE,
SUBSCRIBE,
UNSUBSCRIBE,
DISCONNECT,
OTHER;
}

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.web.messaging;
package org.springframework.web.messaging.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;

View File

@ -0,0 +1,40 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SubscribeEvent {
/**
* Destination value(s) for the subscription.
*/
String[] value() default {};
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface UnsubscribeEvent {
/**
* Destination value(s) for the subscription.
*/
String[] value() default {};
}

View File

@ -37,7 +37,7 @@ import org.springframework.util.Assert;
* @author Arjen Poutsma
* @since 4.0
*/
public abstract class AbstractMessageConverter<T> implements MessageConverter<T> {
public abstract class AbstractMessageConverter implements MessageConverter {
/** Logger available to subclasses */
protected final Log logger = LogFactory.getLog(getClass());
@ -79,7 +79,6 @@ public abstract class AbstractMessageConverter<T> implements MessageConverter<T>
this.supportedMediaTypes = new ArrayList<MediaType>(supportedMediaTypes);
}
@Override
public List<MediaType> getSupportedMediaTypes() {
return Collections.unmodifiableList(this.supportedMediaTypes);
}
@ -159,8 +158,8 @@ public abstract class AbstractMessageConverter<T> implements MessageConverter<T>
* implementations might add some default behavior, however.
*/
@Override
public T convertFromPayload(Class<? extends T> clazz, MediaType contentType, byte[] payload)
throws IOException {
public Object convertFromPayload(Class<?> clazz, MediaType contentType, byte[] payload)
throws IOException, ContentTypeNotSupportedException {
return convertFromPayloadInternal(clazz, contentType, payload);
}
@ -173,8 +172,8 @@ public abstract class AbstractMessageConverter<T> implements MessageConverter<T>
* @return the converted object
* @throws IOException in case of I/O errors
*/
protected abstract T convertFromPayloadInternal(Class<? extends T> clazz, MediaType contentType,
byte[] payload) throws IOException;
protected abstract Object convertFromPayloadInternal(Class<?> clazz, MediaType contentType, byte[] payload)
throws IOException, ContentTypeNotSupportedException;
/**
* This implementation simply delegates to
@ -182,25 +181,13 @@ public abstract class AbstractMessageConverter<T> implements MessageConverter<T>
* implementations might add some default behavior, however.
*/
@Override
public byte[] convertToPayload(T t, MediaType contentType) throws IOException {
return convertToPayloadInternal(t, contentType);
public byte[] convertToPayload(Object content, MediaType contentType)
throws IOException, ContentTypeNotSupportedException {
return convertToPayloadInternal(content, contentType);
}
protected abstract byte[] convertToPayloadInternal(T t, MediaType contentType)
throws IOException;
/**
* Returns the default content type for the given type. Called when {@link #write}
* is invoked without a specified content type parameter.
* <p>By default, this returns the first element of the
* {@link #setSupportedMediaTypes(List) supportedMediaTypes} property, if any.
* Can be overridden in subclasses.
* @param t the type to return the content type for
* @return the content type, or {@code null} if not known
*/
protected MediaType getDefaultContentType(T t) throws IOException {
List<MediaType> mediaTypes = getSupportedMediaTypes();
return (!mediaTypes.isEmpty() ? mediaTypes.get(0) : null);
}
protected abstract byte[] convertToPayloadInternal(Object content, MediaType contentType)
throws IOException, ContentTypeNotSupportedException;
}

View File

@ -0,0 +1,48 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.converter;
import org.springframework.http.MediaType;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class ByteArrayMessageConverter implements MessageConverter {
@Override
public boolean canConvertFromPayload(Class<?> clazz, MediaType contentType) {
return byte[].class.equals(clazz);
}
@Override
public Object convertFromPayload(Class<?> clazz, MediaType contentType, byte[] payload) {
return payload;
}
@Override
public boolean canConvertToPayload(Class<?> clazz, MediaType mediaType) {
return byte[].class.equals(clazz);
}
@Override
public byte[] convertToPayload(Object content, MediaType contentType) {
return (byte[]) content;
}
}

View File

@ -0,0 +1,99 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.converter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.springframework.http.MediaType;
import org.springframework.util.ClassUtils;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class CompositeMessageConverter implements MessageConverter {
private static final boolean jackson2Present =
ClassUtils.isPresent("com.fasterxml.jackson.databind.ObjectMapper", CompositeMessageConverter.class.getClassLoader()) &&
ClassUtils.isPresent("com.fasterxml.jackson.core.JsonGenerator", CompositeMessageConverter.class.getClassLoader());
private final List<MessageConverter> converters;
public CompositeMessageConverter(List<MessageConverter> converters) {
if (converters == null) {
this.converters = new ArrayList<MessageConverter>();
this.converters.add(new ByteArrayMessageConverter());
this.converters.add(new StringMessageConverter());
if (jackson2Present) {
this.converters.add(new MappingJackson2MessageConverter());
}
}
else {
this.converters = converters;
}
}
@Override
public boolean canConvertFromPayload(Class<?> clazz, MediaType contentType) {
for (MessageConverter converter : this.converters) {
if (converter.canConvertFromPayload(clazz, contentType)) {
return true;
}
}
return false;
}
@Override
public Object convertFromPayload(Class<?> clazz, MediaType contentType, byte[] payload)
throws IOException, ContentTypeNotSupportedException {
for (MessageConverter converter : this.converters) {
if (converter.canConvertFromPayload(clazz, contentType)) {
return converter.convertFromPayload(clazz, contentType, payload);
}
}
throw new ContentTypeNotSupportedException(contentType, clazz);
}
@Override
public boolean canConvertToPayload(Class<?> clazz, MediaType mediaType) {
for (MessageConverter converter : this.converters) {
if (converter.canConvertToPayload(clazz, mediaType)) {
return true;
}
}
return false;
}
@Override
public byte[] convertToPayload(Object content, MediaType contentType)
throws IOException, ContentTypeNotSupportedException {
for (MessageConverter converter : this.converters) {
if (converter.canConvertToPayload(content.getClass(), contentType)) {
return converter.convertToPayload(content, contentType);
}
}
throw new ContentTypeNotSupportedException(contentType, content.getClass());
}
}

View File

@ -48,7 +48,7 @@ import com.fasterxml.jackson.databind.SerializationFeature;
* @author Arjen Poutsma
* @since 4.0
*/
public class MappingJackson2MessageConverter extends AbstractMessageConverter<Object> {
public class MappingJackson2MessageConverter extends AbstractMessageConverter {
private ObjectMapper objectMapper = new ObjectMapper();

View File

@ -0,0 +1,37 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.converter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
/**
* @author Mark Fisher
* @since 4.0
*/
@SuppressWarnings("serial")
public class MessageConversionException extends MessagingException {
public MessageConversionException(String description, Throwable cause) {
super(description, cause);
}
public MessageConversionException(Message<?> failedMessage, String description, Throwable cause) {
super(failedMessage, description, cause);
}
}

View File

@ -17,21 +17,20 @@
package org.springframework.web.messaging.converter;
import java.io.IOException;
import java.util.List;
import org.springframework.http.MediaType;
/**
* Strategy for converting byte a array message payload to and from a typed object.
* Strategy for converting a byte array message payload to and from a typed object.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface MessageConverter<T> {
public interface MessageConverter {
/**
* Whether the given class can be converted from a byte array by this converter.
* Whether instances of the given class can be converted to from a byte array.
*
* @param clazz the class to convert from
* @param mediaType the media type of the content, can be {@code null} if not
@ -41,7 +40,7 @@ public interface MessageConverter<T> {
boolean canConvertFromPayload(Class<?> clazz, MediaType mediaType);
/**
* Convert the payload of the given type.
* Convert the byte array payload to the given type.
*
* @param clazz the type of object to return. This type must have previously been
* passed to {@link #canConvertFromPayload(Class, MediaType)} and it must have
@ -51,10 +50,11 @@ public interface MessageConverter<T> {
* @return the converted object
* @throws IOException in case of I/O errors
*/
T convertFromPayload(Class<? extends T> clazz, MediaType contentType, byte[] payload) throws IOException;
Object convertFromPayload(Class<?> clazz, MediaType contentType, byte[] payload)
throws IOException, ContentTypeNotSupportedException;
/**
* Whether the given class can be converted to a byte array by this converter.
* Whether instances of the given class can be converted to a byte array.
*
* @param clazz the class to test
* @param mediaType the media type of the content, can be {@code null} if not specified.
@ -72,13 +72,6 @@ public interface MessageConverter<T> {
* @return the output message
* @throws IOException in case of I/O errors
*/
byte[] convertToPayload(T t, MediaType contentType) throws IOException;
/**
* Return the list of {@link MediaType} objects supported by this converter.
*
* @return the list of supported media types
*/
List<MediaType> getSupportedMediaTypes();
byte[] convertToPayload(Object content, MediaType contentType) throws IOException, ContentTypeNotSupportedException;
}

View File

@ -26,7 +26,7 @@ import org.springframework.http.MediaType;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class StringMessageConverter extends AbstractMessageConverter<String> {
public class StringMessageConverter extends AbstractMessageConverter {
private static final Charset UTF_8 = Charset.forName("UTF-8");
@ -41,15 +41,15 @@ public class StringMessageConverter extends AbstractMessageConverter<String> {
}
@Override
protected String convertFromPayloadInternal(Class<? extends String> clazz, MediaType contentType,
byte[] payload) throws IOException {
protected String convertFromPayloadInternal(Class<?> clazz, MediaType contentType, byte[] payload)
throws IOException {
return new String(payload, UTF_8);
}
@Override
protected byte[] convertToPayloadInternal(String content, MediaType contentType) throws IOException {
return content.getBytes(UTF_8);
protected byte[] convertToPayloadInternal(Object content, MediaType contentType) throws IOException {
return ((String) content).getBytes(UTF_8);
}
}

View File

@ -14,21 +14,17 @@
* limitations under the License.
*/
package org.springframework.web.messaging;
import java.io.IOException;
import org.springframework.http.MediaType;
import org.springframework.web.messaging.converter.ContentTypeNotSupportedException;
package org.springframework.web.messaging.event;
/**
* @author Rossen Stoyanchev
* @since 4.0
*
*/
public interface MessageBroker {
public interface EventBus {
void send(String destination, Object content, MediaType contentType)
throws IOException, ContentTypeNotSupportedException;
void send(String key, Object data);
EventRegistration registerConsumer(String key, EventConsumer consumer);
}

View File

@ -14,22 +14,16 @@
* limitations under the License.
*/
package org.springframework.web.messaging;
package org.springframework.web.messaging.event;
import java.io.IOException;
import org.springframework.http.MediaType;
import org.springframework.web.messaging.converter.ContentTypeNotSupportedException;
/**
* @author Rossen Stoyanchev
* @since 4.0
*
*/
public interface Subscription {
public interface EventConsumer<T> {
void reply(Object content) throws IOException, ContentTypeNotSupportedException;
void reply(Object content, MediaType mediaType) throws IOException, ContentTypeNotSupportedException;
void accept(T data);
}

View File

@ -0,0 +1,29 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.event;
/**
*
*/
public interface EventRegistration {
String getRegistrationKey();
void cancel();
}

View File

@ -0,0 +1,68 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.event;
import reactor.core.Reactor;
import reactor.fn.Consumer;
import reactor.fn.Event;
import reactor.fn.registry.Registration;
import reactor.fn.selector.ObjectSelector;
/**
*
*/
public class ReactorEventBus implements EventBus {
private final Reactor reactor;
public ReactorEventBus(Reactor reactor) {
this.reactor = reactor;
}
@Override
public void send(String key, Object data) {
this.reactor.notify(key, Event.wrap(data));
}
@Override
public EventRegistration registerConsumer(final String key, final EventConsumer consumer) {
ObjectSelector<String> selector = new ObjectSelector<String>(key);
final Registration<Consumer<Event<Object>>> registration = this.reactor.on(selector,
new Consumer<Event<Object>>() {
@Override
public void accept(Event<Object> event) {
consumer.accept(event.getData());
}
});
return new EventRegistration() {
@Override
public String getRegistrationKey() {
return key;
}
@Override
public void cancel() {
registration.cancel();
}
};
}
}

View File

@ -0,0 +1,176 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.service;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.Assert;
import org.springframework.util.PathMatcher;
import org.springframework.web.messaging.MessageType;
import org.springframework.web.messaging.event.EventBus;
import org.springframework.web.messaging.event.EventConsumer;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public abstract class AbstractMessageService {
public static final String MESSAGE_KEY = "messageKey";
public static final String CLIENT_CONNECTION_CLOSED_KEY = "clientConnectionClosed";
protected final Log logger = LogFactory.getLog(getClass());
private final EventBus eventBus;
private final List<String> allowedDestinations = new ArrayList<String>();
private final List<String> disallowedDestinations = new ArrayList<String>();
private final PathMatcher pathMatcher = new AntPathMatcher();
public AbstractMessageService(EventBus reactor) {
Assert.notNull(reactor, "reactor is required");
this.eventBus = reactor;
this.eventBus.registerConsumer(MESSAGE_KEY, new EventConsumer<Message<?>>() {
@Override
public void accept(Message<?> message) {
if (!isAllowedDestination(message)) {
return;
}
if (logger.isTraceEnabled()) {
logger.trace("Processing notification: " + message);
}
MessageType messageType = (MessageType) message.getHeaders().get("messageType");
if (messageType == null || messageType.equals(MessageType.OTHER)) {
processOther(message);
}
else if (MessageType.CONNECT.equals(messageType)) {
processConnect(message);
}
else if (MessageType.MESSAGE.equals(messageType)) {
processMessage(message);
}
else if (MessageType.SUBSCRIBE.equals(messageType)) {
processSubscribe(message);
}
else if (MessageType.UNSUBSCRIBE.equals(messageType)) {
processUnsubscribe(message);
}
else if (MessageType.DISCONNECT.equals(messageType)) {
processDisconnect(message);
}
}
});
this.eventBus.registerConsumer(CLIENT_CONNECTION_CLOSED_KEY, new EventConsumer<String>() {
@Override
public void accept(String sessionId) {
processClientConnectionClosed(sessionId);
}
});
}
/**
* Ant-style destination patterns that this service is allowed to process.
*/
public void setAllowedDestinations(String... patterns) {
this.allowedDestinations.clear();
this.allowedDestinations.addAll(Arrays.asList(patterns));
}
/**
* Ant-style destination patterns that this service should skip.
*/
public void setDisallowedDestinations(String... patterns) {
this.disallowedDestinations.clear();
this.disallowedDestinations.addAll(Arrays.asList(patterns));
}
public EventBus getEventBus() {
return this.eventBus;
}
private boolean isAllowedDestination(Message<?> message) {
String destination = (String) message.getHeaders().get("destination");
if (destination == null) {
return true;
}
if (!this.disallowedDestinations.isEmpty()) {
for (String pattern : this.disallowedDestinations) {
if (this.pathMatcher.match(pattern, destination)) {
if (logger.isTraceEnabled()) {
logger.trace("Skip notification: " + message);
}
return false;
}
}
}
if (!this.allowedDestinations.isEmpty()) {
for (String pattern : this.allowedDestinations) {
if (this.pathMatcher.match(pattern, destination)) {
return true;
}
}
if (logger.isTraceEnabled()) {
logger.trace("Skip notification: " + message);
}
return false;
}
return true;
}
protected void processConnect(Message<?> message) {
}
protected void processMessage(Message<?> message) {
}
protected void processSubscribe(Message<?> message) {
}
protected void processUnsubscribe(Message<?> message) {
}
protected void processDisconnect(Message<?> message) {
}
protected void processOther(Message<?> message) {
}
protected void processClientConnectionClosed(String sessionId) {
}
}

View File

@ -0,0 +1,135 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.http.MediaType;
import org.springframework.messaging.GenericMessage;
import org.springframework.messaging.Message;
import org.springframework.web.messaging.converter.CompositeMessageConverter;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.event.EventBus;
import org.springframework.web.messaging.event.EventConsumer;
import org.springframework.web.messaging.event.EventRegistration;
import reactor.fn.Event;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class PubSubMessageService extends AbstractMessageService {
private MessageConverter payloadConverter;
private Map<String, List<EventRegistration>> subscriptionsBySession =
new ConcurrentHashMap<String, List<EventRegistration>>();
public PubSubMessageService(EventBus reactor) {
super(reactor);
this.payloadConverter = new CompositeMessageConverter(null);
}
public void setMessageConverters(List<MessageConverter> converters) {
this.payloadConverter = new CompositeMessageConverter(converters);
}
@Override
protected void processMessage(Message<?> message) {
if (logger.isDebugEnabled()) {
logger.debug("Message received: " + message);
}
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("destination", message.getHeaders().get("destination"));
MediaType contentType = (MediaType) message.getHeaders().get("content-type");
headers.put("content-type", contentType);
try {
// Convert to byte[] payload before the fan-out
byte[] payload = payloadConverter.convertToPayload(message.getPayload(), contentType);
message = new GenericMessage<byte[]>(payload, headers);
getEventBus().send(getPublishKey(message), Event.wrap(message));
}
catch (Exception ex) {
logger.error("Failed to publish " + message, ex);
}
}
private String getPublishKey(Message<?> message) {
return "destination:" + (String) message.getHeaders().get("destination");
}
@Override
protected void processSubscribe(Message<?> message) {
if (logger.isDebugEnabled()) {
logger.debug("Subscribe " + message);
}
final String replyKey = (String) message.getHeaders().getReplyChannel();
EventRegistration registration = getEventBus().registerConsumer(getPublishKey(message),
new EventConsumer<Message<?>>() {
@Override
public void accept(Message<?> message) {
getEventBus().send(replyKey, message);
}
});
addSubscription((String) message.getHeaders().get("sessionId"), registration);
}
private void addSubscription(String sessionId, EventRegistration registration) {
List<EventRegistration> list = this.subscriptionsBySession.get(sessionId);
if (list == null) {
list = new ArrayList<EventRegistration>();
this.subscriptionsBySession.put(sessionId, list);
}
list.add(registration);
}
@Override
public void processDisconnect(Message<?> message) {
String sessionId = (String) message.getHeaders().get("sessionId");
removeSubscriptions(sessionId);
}
@Override
protected void processClientConnectionClosed(String sessionId) {
removeSubscriptions(sessionId);
}
private void removeSubscriptions(String sessionId) {
List<EventRegistration> registrations = this.subscriptionsBySession.remove(sessionId);
if (logger.isTraceEnabled()) {
logger.trace("Cancelling " + registrations.size() + " subscriptions for session=" + sessionId);
}
for (EventRegistration registration : registrations) {
registration.cancel();
}
}
}

View File

@ -0,0 +1,259 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.service.method;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.MethodParameter;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils.MethodFilter;
import org.springframework.web.messaging.annotation.SubscribeEvent;
import org.springframework.web.messaging.annotation.UnsubscribeEvent;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.event.EventBus;
import org.springframework.web.messaging.service.AbstractMessageService;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.method.HandlerMethodSelector;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class AnnotationMessageService extends AbstractMessageService implements ApplicationContextAware, InitializingBean {
private List<MessageConverter> messageConverters;
private ApplicationContext applicationContext;
private Map<MappingInfo, HandlerMethod> messageMethods = new HashMap<MappingInfo, HandlerMethod>();
private Map<MappingInfo, HandlerMethod> subscribeMethods = new HashMap<MappingInfo, HandlerMethod>();
private Map<MappingInfo, HandlerMethod> unsubscribeMethods = new HashMap<MappingInfo, HandlerMethod>();
private ArgumentResolverComposite argumentResolvers = new ArgumentResolverComposite();
private ReturnValueHandlerComposite returnValueHandlers = new ReturnValueHandlerComposite();
public AnnotationMessageService(EventBus eventBus) {
super(eventBus);
}
public void setMessageConverters(List<MessageConverter> converters) {
this.messageConverters = converters;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void afterPropertiesSet() {
initHandlerMethods();
this.argumentResolvers.addResolver(new MessageChannelArgumentResolver(getEventBus()));
this.argumentResolvers.addResolver(new MessageBodyArgumentResolver(this.messageConverters));
this.returnValueHandlers.addHandler(new MessageReturnValueHandler(getEventBus()));
}
protected void initHandlerMethods() {
String[] beanNames = this.applicationContext.getBeanNamesForType(Object.class);
for (String beanName : beanNames) {
if (isHandler(this.applicationContext.getType(beanName))){
detectHandlerMethods(beanName);
}
}
}
protected boolean isHandler(Class<?> beanType) {
return ((AnnotationUtils.findAnnotation(beanType, Controller.class) != null) ||
(AnnotationUtils.findAnnotation(beanType, MessageMapping.class) != null));
}
protected void detectHandlerMethods(Object handler) {
Class<?> handlerType = (handler instanceof String) ?
this.applicationContext.getType((String) handler) : handler.getClass();
final Class<?> userType = ClassUtils.getUserClass(handlerType);
initHandlerMethods(handler, userType, MessageMapping.class,
new MessageMappingInfoCreator(), this.messageMethods);
initHandlerMethods(handler, userType, SubscribeEvent.class,
new SubscribeMappingInfoCreator(), this.subscribeMethods);
initHandlerMethods(handler, userType, UnsubscribeEvent.class,
new UnsubscribeMappingInfoCreator(), this.unsubscribeMethods);
}
@SuppressWarnings("unchecked")
private <A extends Annotation> void initHandlerMethods(Object handler, Class<?> handlerType,
final Class<A> annotationType, MappingInfoCreator mappingInfoCreator,
Map<MappingInfo, HandlerMethod> handlerMethods) {
Set<Method> messageMethods = HandlerMethodSelector.selectMethods(handlerType, new MethodFilter() {
@Override
public boolean matches(Method method) {
return AnnotationUtils.findAnnotation(method, annotationType) != null;
}
});
for (Method method : messageMethods) {
A annotation = AnnotationUtils.findAnnotation(method, annotationType);
HandlerMethod hm = createHandlerMethod(handler, method);
handlerMethods.put(mappingInfoCreator.create(annotation), hm);
}
}
protected HandlerMethod createHandlerMethod(Object handler, Method method) {
HandlerMethod handlerMethod;
if (handler instanceof String) {
String beanName = (String) handler;
handlerMethod = new HandlerMethod(beanName, this.applicationContext, method);
}
else {
handlerMethod = new HandlerMethod(handler, method);
}
return handlerMethod;
}
@Override
protected void processMessage(Message<?> message) {
handleMessage(message, this.messageMethods);
}
@Override
protected void processSubscribe(Message<?> message) {
handleMessage(message, this.subscribeMethods);
}
@Override
protected void processUnsubscribe(Message<?> message) {
handleMessage(message, this.unsubscribeMethods);
}
private void handleMessage(final Message<?> message, Map<MappingInfo, HandlerMethod> handlerMethods) {
String destination = (String) message.getHeaders().get("destination");
HandlerMethod match = getHandlerMethod(destination, handlerMethods);
if (match == null) {
return;
}
HandlerMethod handlerMethod = match.createWithResolvedBean();
// TODO:
InvocableMessageHandlerMethod invocableHandlerMethod = new InvocableMessageHandlerMethod(handlerMethod);
invocableHandlerMethod.setMessageMethodArgumentResolvers(this.argumentResolvers);
try {
Object value = invocableHandlerMethod.invoke(message);
MethodParameter returnType = handlerMethod.getReturnType();
if (void.class.equals(returnType.getParameterType())) {
return;
}
this.returnValueHandlers.handleReturnValue(value, returnType, message);
}
catch (Throwable e) {
// TODO: send error message, or add @ExceptionHandler-like capability
e.printStackTrace();
}
}
protected HandlerMethod getHandlerMethod(String destination, Map<MappingInfo, HandlerMethod> handlerMethods) {
for (MappingInfo key : handlerMethods.keySet()) {
for (String mappingDestination : key.getDestinations()) {
if (destination.equals(mappingDestination)) {
return handlerMethods.get(key);
}
}
}
return null;
}
private static class MappingInfo {
private final List<String> destinations;
public MappingInfo(List<String> destinations) {
this.destinations = destinations;
}
public List<String> getDestinations() {
return this.destinations;
}
@Override
public String toString() {
return "MappingInfo [destinations=" + this.destinations + "]";
}
}
private interface MappingInfoCreator<A extends Annotation> {
MappingInfo create(A annotation);
}
private static class MessageMappingInfoCreator implements MappingInfoCreator<MessageMapping> {
@Override
public MappingInfo create(MessageMapping annotation) {
return new MappingInfo(Arrays.asList(annotation.value()));
}
}
private static class SubscribeMappingInfoCreator implements MappingInfoCreator<SubscribeEvent> {
@Override
public MappingInfo create(SubscribeEvent annotation) {
return new MappingInfo(Arrays.asList(annotation.value()));
}
}
private static class UnsubscribeMappingInfoCreator implements MappingInfoCreator<UnsubscribeEvent> {
@Override
public MappingInfo create(UnsubscribeEvent annotation) {
return new MappingInfo(Arrays.asList(annotation.value()));
}
}
}

View File

@ -14,10 +14,10 @@
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.service;
package org.springframework.web.messaging.service.method;
import org.springframework.core.MethodParameter;
import org.springframework.web.messaging.stomp.StompMessage;
import org.springframework.messaging.Message;
/**
@ -27,7 +27,7 @@ import org.springframework.web.messaging.stomp.StompMessage;
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface MessageMethodArgumentResolver {
public interface ArgumentResolver {
/**
* Whether the given {@linkplain MethodParameter method parameter} is
@ -48,12 +48,11 @@ public interface MessageMethodArgumentResolver {
* {@link #supportsParameter(org.springframework.core.MethodParameter)}
* and it must have returned {@code true}
* @param message
* @param replyTo
*
* @return the resolved argument value, or {@code null}.
*
* @throws Exception in case of errors with the preparation of argument values
*/
Object resolveArgument(MethodParameter parameter, StompMessage message, Object replyTo) throws Exception;
Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception;
}

View File

@ -0,0 +1,116 @@
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.service.method;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.MethodParameter;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/**
* Resolves method parameters by delegating to a list of registered
* {@link ArgumentResolver}. Previously resolved method parameters are cached
* for faster lookups.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class ArgumentResolverComposite implements ArgumentResolver {
protected final Log logger = LogFactory.getLog(getClass());
private final List<ArgumentResolver> argumentResolvers =
new LinkedList<ArgumentResolver>();
private final Map<MethodParameter, ArgumentResolver> argumentResolverCache =
new ConcurrentHashMap<MethodParameter, ArgumentResolver>(256);
/**
* Return a read-only list with the contained resolvers, or an empty list.
*/
public List<ArgumentResolver> getResolvers() {
return Collections.unmodifiableList(this.argumentResolvers);
}
/**
* Whether the given {@linkplain MethodParameter method parameter} is supported by any registered
* {@link ArgumentResolver}.
*/
@Override
public boolean supportsParameter(MethodParameter parameter) {
return getArgumentResolver(parameter) != null;
}
/**
* Iterate over registered {@link ArgumentResolver}s and invoke the one that supports it.
* @exception IllegalStateException if no suitable {@link ArgumentResolver} is found.
*/
@Override
public Object resolveArgument(MethodParameter parameter, Message message) throws Exception {
ArgumentResolver resolver = getArgumentResolver(parameter);
Assert.notNull(resolver, "Unknown parameter type [" + parameter.getParameterType().getName() + "]");
return resolver.resolveArgument(parameter, message);
}
/**
* Find a registered {@link ArgumentResolver} that supports the given method parameter.
*/
private ArgumentResolver getArgumentResolver(MethodParameter parameter) {
ArgumentResolver result = this.argumentResolverCache.get(parameter);
if (result == null) {
for (ArgumentResolver methodArgumentResolver : this.argumentResolvers) {
if (methodArgumentResolver.supportsParameter(parameter)) {
result = methodArgumentResolver;
this.argumentResolverCache.put(parameter, result);
break;
}
}
}
return result;
}
/**
* Add the given {@link ArgumentResolver}.
*/
public ArgumentResolverComposite addResolver(ArgumentResolver argumentResolver) {
this.argumentResolvers.add(argumentResolver);
return this;
}
/**
* Add the given {@link ArgumentResolver}s.
*/
public ArgumentResolverComposite addResolvers(
List<? extends ArgumentResolver> argumentResolvers) {
if (argumentResolvers != null) {
for (ArgumentResolver resolver : argumentResolvers) {
this.argumentResolvers.add(resolver);
}
}
return this;
}
}

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.service;
package org.springframework.web.messaging.service.method;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
@ -23,21 +23,21 @@ import org.springframework.core.GenericTypeResolver;
import org.springframework.core.LocalVariableTableParameterNameDiscoverer;
import org.springframework.core.MethodParameter;
import org.springframework.core.ParameterNameDiscoverer;
import org.springframework.messaging.Message;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.bind.WebDataBinder;
import org.springframework.web.bind.support.WebDataBinderFactory;
import org.springframework.web.messaging.stomp.StompMessage;
import org.springframework.web.method.HandlerMethod;
/**
* Invokes the handler method for a given message after resolving
* its method argument values through registered {@link MessageMethodArgumentResolver}s.
* its method argument values through registered {@link ArgumentResolver}s.
* <p>
* Argument resolution often requires a {@link WebDataBinder} for data binding or for type
* conversion. Use the {@link #setDataBinderFactory(WebDataBinderFactory)} property to
* supply a binder factory to pass to argument resolvers.
* <p>
* Use {@link #setMessageMethodArgumentResolvers(MessageMethodArgumentResolverComposite)}
* Use {@link #setMessageMethodArgumentResolvers(ArgumentResolverComposite)}
* to customize the list of argument resolvers.
*
* @author Rossen Stoyanchev
@ -45,7 +45,7 @@ import org.springframework.web.method.HandlerMethod;
*/
public class InvocableMessageHandlerMethod extends HandlerMethod {
private MessageMethodArgumentResolverComposite argumentResolvers = new MessageMethodArgumentResolverComposite();
private ArgumentResolverComposite argumentResolvers = new ArgumentResolverComposite();
private ParameterNameDiscoverer parameterNameDiscoverer = new LocalVariableTableParameterNameDiscoverer();
@ -72,10 +72,10 @@ public class InvocableMessageHandlerMethod extends HandlerMethod {
}
/**
* Set {@link MessageMethodArgumentResolver}s to use to use for resolving method
* Set {@link ArgumentResolver}s to use to use for resolving method
* argument values.
*/
public void setMessageMethodArgumentResolvers(MessageMethodArgumentResolverComposite argumentResolvers) {
public void setMessageMethodArgumentResolvers(ArgumentResolverComposite argumentResolvers) {
this.argumentResolvers = argumentResolvers;
}
@ -97,9 +97,9 @@ public class InvocableMessageHandlerMethod extends HandlerMethod {
* @exception Exception raised if no suitable argument resolver can be found, or the
* method raised an exception
*/
public final Object invoke(StompMessage message, Object replyTo) throws Exception {
public final Object invoke(Message<?> message) throws Exception {
Object[] args = getMethodArgumentValues(message, replyTo);
Object[] args = getMethodArgumentValues(message);
if (logger.isTraceEnabled()) {
StringBuilder builder = new StringBuilder("Invoking [");
@ -120,7 +120,7 @@ public class InvocableMessageHandlerMethod extends HandlerMethod {
/**
* Get the method argument values for the current request.
*/
private Object[] getMethodArgumentValues(StompMessage message, Object replyTo) throws Exception {
private Object[] getMethodArgumentValues(Message<?> message) throws Exception {
MethodParameter[] parameters = getMethodParameters();
Object[] args = new Object[parameters.length];
@ -136,7 +136,7 @@ public class InvocableMessageHandlerMethod extends HandlerMethod {
if (this.argumentResolvers.supportsParameter(parameter)) {
try {
args[i] = this.argumentResolvers.resolveArgument(parameter, message, replyTo);
args[i] = this.argumentResolvers.resolveArgument(parameter, message);
continue;
} catch (Exception ex) {
if (logger.isTraceEnabled()) {

View File

@ -0,0 +1,73 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.service.method;
import java.util.List;
import org.springframework.core.MethodParameter;
import org.springframework.http.MediaType;
import org.springframework.messaging.Message;
import org.springframework.web.messaging.annotation.MessageBody;
import org.springframework.web.messaging.converter.CompositeMessageConverter;
import org.springframework.web.messaging.converter.MessageConversionException;
import org.springframework.web.messaging.converter.MessageConverter;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class MessageBodyArgumentResolver implements ArgumentResolver {
private final MessageConverter converter;
public MessageBodyArgumentResolver(List<MessageConverter> converters) {
this.converter = new CompositeMessageConverter(converters);
}
@Override
public boolean supportsParameter(MethodParameter parameter) {
return true;
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception {
Object arg = null;
MessageBody annot = parameter.getParameterAnnotation(MessageBody.class);
MediaType contentType = (MediaType) message.getHeaders().get("content-type");
if (annot == null || annot.required()) {
Class<?> sourceType = message.getPayload().getClass();
Class<?> parameterType = parameter.getParameterType();
if (parameterType.isAssignableFrom(sourceType)) {
return message.getPayload();
}
else if (byte[].class.equals(sourceType)) {
return this.converter.convertFromPayload(parameterType, contentType, (byte[]) message.getPayload());
}
else {
throw new MessageConversionException(message, "Unexpected payload type", null);
}
}
return arg;
}
}

View File

@ -0,0 +1,83 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.service.method;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.MethodParameter;
import org.springframework.messaging.GenericMessage;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.web.messaging.MessageType;
import org.springframework.web.messaging.event.EventBus;
import org.springframework.web.messaging.service.AbstractMessageService;
import reactor.util.Assert;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class MessageChannelArgumentResolver implements ArgumentResolver {
private static Log logger = LogFactory.getLog(MessageChannelArgumentResolver.class);
private final EventBus eventBus;
public MessageChannelArgumentResolver(EventBus eventBus) {
Assert.notNull(eventBus, "reactor is required");
this.eventBus = eventBus;
}
@Override
public boolean supportsParameter(MethodParameter parameter) {
return MessageChannel.class.equals(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception {
final String sessionId = (String) message.getHeaders().get("sessionId");
return new MessageChannel() {
@Override
public boolean send(Message<?> message) {
Map<String, Object> headers = new HashMap<String, Object>(message.getHeaders());
headers.put("messageType", MessageType.MESSAGE);
headers.put("sessionId", sessionId);
message = new GenericMessage<Object>(message.getPayload(), headers);
if (logger.isTraceEnabled()) {
logger.trace("Sending notification: " + message);
}
String key = AbstractMessageService.MESSAGE_KEY;
MessageChannelArgumentResolver.this.eventBus.send(key, message);
return true;
}
};
}
}

View File

@ -0,0 +1,79 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.service.method;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.MethodParameter;
import org.springframework.messaging.Message;
import org.springframework.web.messaging.event.EventBus;
import reactor.util.Assert;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class MessageReturnValueHandler implements ReturnValueHandler {
private static Log logger = LogFactory.getLog(MessageReturnValueHandler.class);
private final EventBus eventBus;
public MessageReturnValueHandler(EventBus eventBus) {
this.eventBus = eventBus;
}
@Override
public boolean supportsReturnType(MethodParameter returnType) {
Class<?> paramType = returnType.getParameterType();
return Message.class.isAssignableFrom(paramType);
// if (Message.class.isAssignableFrom(paramType)) {
// return true;
// }
// else if (List.class.isAssignableFrom(paramType)) {
// Type type = returnType.getGenericParameterType();
// if (type instanceof ParameterizedType) {
// Type genericType = ((ParameterizedType) type).getActualTypeArguments()[0];
// }
// }
// return Message.class.isAssignableFrom(paramType);
}
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType, Message<?> message)
throws Exception {
Message<?> returnMessage = (Message<?>) returnValue;
if (returnMessage == null) {
return;
}
String replyTo = (String) message.getHeaders().getReplyChannel();
Assert.notNull(replyTo, "Cannot reply to: " + message);
if (logger.isTraceEnabled()) {
logger.trace("Sending notification: " + message);
}
this.eventBus.send(replyTo, returnMessage);
}
}

View File

@ -0,0 +1,55 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.service.method;
import org.springframework.core.MethodParameter;
import org.springframework.messaging.Message;
/**
* Strategy interface to handle the value returned from the invocation of a
* handler method .
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface ReturnValueHandler {
/**
* Whether the given {@linkplain MethodParameter method return type} is
* supported by this handler.
*
* @param returnType the method return type to check
* @return {@code true} if this handler supports the supplied return type;
* {@code false} otherwise
*/
boolean supportsReturnType(MethodParameter returnType);
/**
* Handle the given return value.
*
* @param returnValue the value returned from the handler method
* @param returnType the type of the return value. This type must have
* previously been passed to
* {@link #supportsReturnType(org.springframework.core.MethodParameter)}
* and it must have returned {@code true}
* @param message the message that caused this method to be called
* @throws Exception if the return value handling results in an error
*/
void handleReturnValue(Object returnValue, MethodParameter returnType, Message<?> message) throws Exception;
}

View File

@ -0,0 +1,80 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.service.method;
import java.util.ArrayList;
import java.util.List;
import org.springframework.core.MethodParameter;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class ReturnValueHandlerComposite implements ReturnValueHandler {
private final List<ReturnValueHandler> returnValueHandlers =
new ArrayList<ReturnValueHandler>();
/**
* Add the given {@link ReturnValueHandler}.
*/
public ReturnValueHandlerComposite addHandler(ReturnValueHandler returnValuehandler) {
this.returnValueHandlers.add(returnValuehandler);
return this;
}
/**
* Add the given {@link ReturnValueHandler}s.
*/
public ReturnValueHandlerComposite addHandlers(List<? extends ReturnValueHandler> handlers) {
if (handlers != null) {
for (ReturnValueHandler handler : handlers) {
this.returnValueHandlers.add(handler);
}
}
return this;
}
@Override
public boolean supportsReturnType(MethodParameter returnType) {
return getReturnValueHandler(returnType) != null;
}
private ReturnValueHandler getReturnValueHandler(MethodParameter returnType) {
for (ReturnValueHandler handler : this.returnValueHandlers) {
if (handler.supportsReturnType(returnType)) {
return handler;
}
}
return null;
}
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType, Message<?> message)
throws Exception {
ReturnValueHandler handler = getReturnValueHandler(returnType);
Assert.notNull(handler, "Unknown return value type [" + returnType.getParameterType().getName() + "]");
handler.handleReturnValue(returnValue, returnType, message);
}
}

View File

@ -16,8 +16,6 @@
package org.springframework.web.messaging.stomp;
import org.springframework.web.messaging.MessageType;
/**
*
@ -45,23 +43,4 @@ public enum StompCommand {
RECEIPT,
ERROR;
public MessageType getMessageType() {
if (this == CONNECT) {
return MessageType.CONNECT;
}
else if (this == SUBSCRIBE) {
return MessageType.SUBSCRIBE;
}
else if (this == UNSUBSCRIBE) {
return MessageType.UNSUBSCRIBE;
}
else if (this == SEND) {
return MessageType.SEND;
}
else {
return MessageType.OTHER;
}
}
}

View File

@ -17,6 +17,7 @@
package org.springframework.web.messaging.stomp;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
@ -43,12 +44,12 @@ public class StompHeaders implements MultiValueMap<String, String>, Serializable
// TODO: separate client from server headers so they can't be mixed
// Client
private static final String ACCEPT_VERSION = "accept-version";
private static final String ID = "id";
private static final String HOST = "host";
private static final String ACCEPT_VERSION = "accept-version";
// Server
private static final String MESSAGE_ID = "message-id";
@ -74,6 +75,11 @@ public class StompHeaders implements MultiValueMap<String, String>, Serializable
private static final String HEARTBEAT = "heart-beat";
public static final List<String> STANDARD_HEADER_NAMES =
Arrays.asList(ID, HOST, ACCEPT_VERSION, MESSAGE_ID, RECEIPT_ID, SUBSCRIPTION,
VERSION, MESSAGE, ACK, DESTINATION, CONTENT_LENGTH, CONTENT_TYPE, HEARTBEAT);
private final Map<String, List<String>> headers;

View File

@ -36,6 +36,6 @@ public interface StompSession {
/**
* Register a task to be invoked if the underlying connection is closed.
*/
void registerConnectionClosedCallback(Runnable task);
void registerConnectionClosedTask(Runnable task);
}

View File

@ -1,202 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.service;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.Assert;
import org.springframework.util.PathMatcher;
import org.springframework.web.messaging.stomp.StompCommand;
import org.springframework.web.messaging.stomp.StompMessage;
import reactor.Fn;
import reactor.core.Reactor;
import reactor.fn.Consumer;
import reactor.fn.Event;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public abstract class AbstractStompService {
protected final Log logger = LogFactory.getLog(getClass());
private final Reactor reactor;
private final List<String> allowedDestinations = new ArrayList<String>();
private final List<String> disallowedDestinations = new ArrayList<String>();
private final PathMatcher pathMatcher = new AntPathMatcher();
public AbstractStompService(Reactor reactor) {
Assert.notNull(reactor, "reactor is required");
this.reactor = reactor;
this.reactor.on(Fn.$(StompCommand.CONNECT), new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
processConnect(event.getData(), event.getReplyTo());
}
});
this.reactor.on(Fn.$(StompCommand.SUBSCRIBE), new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
StompMessage message = event.getData();
if (isAllowedDestination(message)) {
processSubscribe(event.getData(), event.getReplyTo());
}
}
});
this.reactor.on(Fn.$(StompCommand.SEND), new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
StompMessage message = event.getData();
if (isAllowedDestination(message)) {
processSend(event.getData());
}
}
});
this.reactor.on(Fn.$(StompCommand.DISCONNECT), new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
processDisconnect(event.getData());
}
});
this.reactor.on(Fn.$(StompCommand.ACK), new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
processAck(event.getData());
}
});
this.reactor.on(Fn.$(StompCommand.NACK), new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
processNack(event.getData());
}
});
this.reactor.on(Fn.$(StompCommand.BEGIN), new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
processBegin(event.getData());
}
});
this.reactor.on(Fn.$(StompCommand.COMMIT), new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
processCommit(event.getData());
}
});
this.reactor.on(Fn.$(StompCommand.ABORT), new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
processAbort(event.getData());
}
});
this.reactor.on(Fn.$("CONNECTION_CLOSED"), new Consumer<Event<String>>() {
@Override
public void accept(Event<String> event) {
processConnectionClosed(event.getData());
}
});
}
/**
* Ant-style destination patterns that this STOMP service is allowed to process.
*/
public void setAllowedDestinations(String... patterns) {
this.allowedDestinations.clear();
this.allowedDestinations.addAll(Arrays.asList(patterns));
}
/**
* Ant-style destination patterns that this STOMP service should skip.
*/
public void setDisallowedDestinations(String... patterns) {
this.disallowedDestinations.clear();
this.disallowedDestinations.addAll(Arrays.asList(patterns));
}
public Reactor getReactor() {
return this.reactor;
}
private boolean isAllowedDestination(StompMessage message) {
String destination = message.getHeaders().getDestination();
if (destination == null) {
return true;
}
if (!this.disallowedDestinations.isEmpty()) {
for (String pattern : this.disallowedDestinations) {
if (this.pathMatcher.match(pattern, destination)) {
return false;
}
}
}
if (!this.allowedDestinations.isEmpty()) {
for (String pattern : this.allowedDestinations) {
if (this.pathMatcher.match(pattern, destination)) {
return true;
}
}
return false;
}
return true;
}
protected void processConnect(StompMessage message, Object replyTo) {
}
protected void processSubscribe(StompMessage message, Object replyTo) {
}
protected void processSend(StompMessage message) {
}
protected void processDisconnect(StompMessage message) {
}
protected void processAck(StompMessage message) {
}
protected void processNack(StompMessage message) {
}
protected void processBegin(StompMessage message) {
}
protected void processCommit(StompMessage message) {
}
protected void processAbort(StompMessage message) {
}
protected void processConnectionClosed(String sessionId) {
}
}

View File

@ -1,188 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.service;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.stereotype.Controller;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils.MethodFilter;
import org.springframework.web.messaging.MessageMapping;
import org.springframework.web.messaging.MessageType;
import org.springframework.web.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.converter.StringMessageConverter;
import org.springframework.web.messaging.stomp.StompMessage;
import org.springframework.web.messaging.stomp.service.support.MessageBodyArgumentResolver;
import org.springframework.web.messaging.stomp.service.support.MessageBrokerArgumentResolver;
import org.springframework.web.messaging.stomp.service.support.SubscriptionArgumentResolver;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.method.HandlerMethodSelector;
import reactor.core.Reactor;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class AnnotationStompService extends AbstractStompService
implements ApplicationContextAware, InitializingBean {
private List<MessageConverter<?>> messageConverters;
private ApplicationContext applicationContext;
private Map<MessageMapping, HandlerMethod> handlerMethods = new HashMap<MessageMapping, HandlerMethod>();
private MessageMethodArgumentResolverComposite argumentResolvers = new MessageMethodArgumentResolverComposite();
public AnnotationStompService(Reactor reactor) {
super(reactor);
}
public void setMessageConverters(List<MessageConverter<?>> messageConverters) {
this.messageConverters = messageConverters;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void afterPropertiesSet() {
initHandlerMethods();
if (this.messageConverters == null) {
this.messageConverters = new ArrayList<MessageConverter<?>>();
this.messageConverters.add(new StringMessageConverter());
this.messageConverters.add(new MappingJackson2MessageConverter());
}
this.argumentResolvers.addResolver(new SubscriptionArgumentResolver(getReactor(), this.messageConverters));
this.argumentResolvers.addResolver(new MessageBrokerArgumentResolver(getReactor(), this.messageConverters));
this.argumentResolvers.addResolver(new MessageBodyArgumentResolver(this.messageConverters));
}
protected void initHandlerMethods() {
String[] beanNames = this.applicationContext.getBeanNamesForType(Object.class);
for (String beanName : beanNames) {
if (isHandler(this.applicationContext.getType(beanName))){
detectHandlerMethods(beanName);
}
}
}
protected boolean isHandler(Class<?> beanType) {
return ((AnnotationUtils.findAnnotation(beanType, Controller.class) != null) ||
(AnnotationUtils.findAnnotation(beanType, MessageMapping.class) != null));
}
protected void detectHandlerMethods(final Object handler) {
Class<?> handlerType = (handler instanceof String) ?
this.applicationContext.getType((String) handler) : handler.getClass();
final Class<?> userType = ClassUtils.getUserClass(handlerType);
Set<Method> methods = HandlerMethodSelector.selectMethods(userType, new MethodFilter() {
@Override
public boolean matches(Method method) {
return AnnotationUtils.findAnnotation(method, MessageMapping.class) != null;
}
});
for (Method method : methods) {
MessageMapping mapping = AnnotationUtils.findAnnotation(method, MessageMapping.class);
HandlerMethod handlerMethod = createHandlerMethod(handler, method);
this.handlerMethods.put(mapping, handlerMethod);
}
}
protected HandlerMethod createHandlerMethod(Object handler, Method method) {
HandlerMethod handlerMethod;
if (handler instanceof String) {
String beanName = (String) handler;
handlerMethod = new HandlerMethod(beanName, this.applicationContext, method);
}
else {
handlerMethod = new HandlerMethod(handler, method);
}
return handlerMethod;
}
protected HandlerMethod getHandlerMethod(String destination, MessageType messageType) {
for (MessageMapping mapping : this.handlerMethods.keySet()) {
boolean match = false;
for (String mappingDestination : mapping.value()) {
if (destination.equals(mappingDestination)) {
match = true;
break;
}
}
if (match && messageType.equals(mapping.messageType())) {
return this.handlerMethods.get(mapping);
}
}
return null;
}
@Override
protected void processSubscribe(StompMessage message, Object replyTo) {
handleMessage(message, replyTo, MessageType.SUBSCRIBE);
}
@Override
protected void processSend(StompMessage message) {
handleMessage(message, null, MessageType.SEND);
}
private void handleMessage(final StompMessage message, final Object replyTo, MessageType messageType) {
String destination = message.getHeaders().getDestination();
HandlerMethod match = getHandlerMethod(destination, messageType);
if (match == null) {
return;
}
HandlerMethod handlerMethod = match.createWithResolvedBean();
InvocableMessageHandlerMethod messageHandlerMethod = new InvocableMessageHandlerMethod(handlerMethod);
messageHandlerMethod.setMessageMethodArgumentResolvers(this.argumentResolvers);
try {
messageHandlerMethod.invoke(message, replyTo);
}
catch (Throwable e) {
// TODO: send error message, or add @ExceptionHandler-like capability
e.printStackTrace();
}
}
}

View File

@ -1,120 +0,0 @@
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.service;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.MethodParameter;
import org.springframework.util.Assert;
import org.springframework.web.messaging.stomp.StompMessage;
/**
* Resolves method parameters by delegating to a list of registered
* {@link MessageMethodArgumentResolver}. Previously resolved method parameters are cached
* for faster lookups.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class MessageMethodArgumentResolverComposite implements MessageMethodArgumentResolver {
protected final Log logger = LogFactory.getLog(getClass());
private final List<MessageMethodArgumentResolver> argumentResolvers =
new LinkedList<MessageMethodArgumentResolver>();
private final Map<MethodParameter, MessageMethodArgumentResolver> argumentResolverCache =
new ConcurrentHashMap<MethodParameter, MessageMethodArgumentResolver>(256);
/**
* Return a read-only list with the contained resolvers, or an empty list.
*/
public List<MessageMethodArgumentResolver> getResolvers() {
return Collections.unmodifiableList(this.argumentResolvers);
}
/**
* Whether the given {@linkplain MethodParameter method parameter} is supported by any registered
* {@link MessageMethodArgumentResolver}.
*/
@Override
public boolean supportsParameter(MethodParameter parameter) {
return getArgumentResolver(parameter) != null;
}
/**
* Iterate over registered {@link MessageMethodArgumentResolver}s and invoke the one that supports it.
* @exception IllegalStateException if no suitable {@link MessageMethodArgumentResolver} is found.
*/
@Override
public Object resolveArgument(MethodParameter parameter, StompMessage message, Object replyTo) throws Exception {
MessageMethodArgumentResolver resolver = getArgumentResolver(parameter);
Assert.notNull(resolver, "Unknown parameter type [" + parameter.getParameterType().getName() + "]");
return resolver.resolveArgument(parameter, message, replyTo);
}
/**
* Find a registered {@link MessageMethodArgumentResolver} that supports the given method parameter.
*/
private MessageMethodArgumentResolver getArgumentResolver(MethodParameter parameter) {
MessageMethodArgumentResolver result = this.argumentResolverCache.get(parameter);
if (result == null) {
for (MessageMethodArgumentResolver methodArgumentResolver : this.argumentResolvers) {
if (logger.isTraceEnabled()) {
logger.trace("Testing if argument resolver [" + methodArgumentResolver + "] supports [" +
parameter.getGenericParameterType() + "]");
}
if (methodArgumentResolver.supportsParameter(parameter)) {
result = methodArgumentResolver;
this.argumentResolverCache.put(parameter, result);
break;
}
}
}
return result;
}
/**
* Add the given {@link MessageMethodArgumentResolver}.
*/
public MessageMethodArgumentResolverComposite addResolver(MessageMethodArgumentResolver argumentResolver) {
this.argumentResolvers.add(argumentResolver);
return this;
}
/**
* Add the given {@link MessageMethodArgumentResolver}s.
*/
public MessageMethodArgumentResolverComposite addResolvers(
List<? extends MessageMethodArgumentResolver> argumentResolvers) {
if (argumentResolvers != null) {
for (MessageMethodArgumentResolver resolver : argumentResolvers) {
this.argumentResolvers.add(resolver);
}
}
return this;
}
}

View File

@ -1,106 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.web.messaging.stomp.StompCommand;
import org.springframework.web.messaging.stomp.StompHeaders;
import org.springframework.web.messaging.stomp.StompMessage;
import reactor.Fn;
import reactor.core.Reactor;
import reactor.fn.Consumer;
import reactor.fn.Event;
import reactor.fn.registry.Registration;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class SimpleStompService extends AbstractStompService {
private Map<String, List<Registration<?>>> subscriptionsBySession =
new ConcurrentHashMap<String, List<Registration<?>>>();
public SimpleStompService(Reactor reactor) {
super(reactor);
}
@Override
protected void processSubscribe(StompMessage message, final Object replyTo) {
if (logger.isDebugEnabled()) {
logger.debug("Subscribe " + message);
}
Registration<?> registration = getReactor().on(
Fn.$("destination:" + message.getHeaders().getDestination()),
new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> sendEvent) {
StompMessage inMessage = sendEvent.getData();
StompHeaders headers = new StompHeaders();
headers.setDestination(inMessage.getHeaders().getDestination());
StompMessage outMessage = new StompMessage(StompCommand.MESSAGE, headers, inMessage.getPayload());
getReactor().notify(replyTo, Event.wrap(outMessage));
}
});
addSubscription(message.getSessionId(), registration);
}
private void addSubscription(String sessionId, Registration<?> registration) {
List<Registration<?>> list = this.subscriptionsBySession.get(sessionId);
if (list == null) {
list = new ArrayList<Registration<?>>();
this.subscriptionsBySession.put(sessionId, list);
}
list.add(registration);
}
@Override
protected void processSend(StompMessage message) {
logger.debug("Message received: " + message);
String destination = message.getHeaders().getDestination();
getReactor().notify("destination:" + destination, Event.wrap(message));
}
@Override
protected void processDisconnect(StompMessage message) {
removeSubscriptions(message.getSessionId());
}
@Override
protected void processConnectionClosed(String sessionId) {
removeSubscriptions(sessionId);
}
private void removeSubscriptions(String sessionId) {
List<Registration<?>> registrations = this.subscriptionsBySession.remove(sessionId);
if (logger.isTraceEnabled()) {
logger.trace("Cancelling " + registrations.size() + " subscriptions for session=" + sessionId);
}
for (Registration<?> registration : registrations) {
registration.cancel();
}
}
}

View File

@ -1,80 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.service.support;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.springframework.http.MediaType;
import org.springframework.util.CollectionUtils;
import org.springframework.web.messaging.converter.ContentTypeNotSupportedException;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.stomp.service.MessageMethodArgumentResolver;
import reactor.core.Reactor;
import reactor.util.Assert;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public abstract class AbstractPayloadSendingArgumentResolver implements MessageMethodArgumentResolver {
private final Reactor reactor;
private final List<MessageConverter<?>> converters;
public AbstractPayloadSendingArgumentResolver(Reactor reactor, List<MessageConverter<?>> converters) {
Assert.notNull(reactor, "reactor is required");
this.reactor = reactor;
this.converters = (converters != null) ? converters : new ArrayList<MessageConverter<?>>();
}
public Reactor getReactor() {
return this.reactor;
}
public List<MessageConverter<?>> getMessageConverters() {
return this.converters;
}
@SuppressWarnings("unchecked")
protected byte[] convertToPayload(Object content, MediaType contentType)
throws IOException, ContentTypeNotSupportedException {
if (content == null) {
return null;
}
Class<? extends Object> clazz = content.getClass();
if (byte[].class.equals(clazz)) {
return (byte[]) content;
}
else if (!CollectionUtils.isEmpty(this.converters)) {
for (MessageConverter converter : getMessageConverters()) {
if (converter.canConvertToPayload(clazz, contentType)) {
return converter.convertToPayload(content, contentType);
}
}
}
throw new ContentTypeNotSupportedException(contentType, clazz);
}
}

View File

@ -1,78 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.service.support;
import java.util.ArrayList;
import java.util.List;
import org.springframework.core.MethodParameter;
import org.springframework.http.MediaType;
import org.springframework.web.messaging.MessageBody;
import org.springframework.web.messaging.converter.ContentTypeNotSupportedException;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.stomp.StompMessage;
import org.springframework.web.messaging.stomp.service.MessageMethodArgumentResolver;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class MessageBodyArgumentResolver implements MessageMethodArgumentResolver {
private final List<MessageConverter<?>> converters;
public MessageBodyArgumentResolver(List<MessageConverter<?>> converters) {
this.converters = (converters != null) ? converters : new ArrayList<MessageConverter<?>>();
}
@Override
public boolean supportsParameter(MethodParameter parameter) {
return true;
}
@SuppressWarnings("unchecked")
@Override
public Object resolveArgument(MethodParameter parameter, StompMessage message, Object replyTo)
throws Exception {
byte[] payload = message.getPayload();
Class<?> parameterType = parameter.getParameterType();
if (byte[].class.isAssignableFrom(parameterType)) {
return payload;
}
Object arg = null;
MessageBody annot = parameter.getParameterAnnotation(MessageBody.class);
MediaType contentType = message.getHeaders().getContentType();
if (annot == null || annot.required()) {
for (MessageConverter converter : this.converters) {
if (converter.canConvertFromPayload(parameterType, contentType)) {
return converter.convertFromPayload(parameterType, contentType, payload);
}
}
throw new ContentTypeNotSupportedException(contentType, parameterType);
}
return arg;
}
}

View File

@ -1,86 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.service.support;
import java.io.IOException;
import java.util.List;
import org.springframework.core.MethodParameter;
import org.springframework.http.MediaType;
import org.springframework.web.messaging.MessageBroker;
import org.springframework.web.messaging.converter.ContentTypeNotSupportedException;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.stomp.StompCommand;
import org.springframework.web.messaging.stomp.StompHeaders;
import org.springframework.web.messaging.stomp.StompMessage;
import reactor.core.Reactor;
import reactor.fn.Event;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class MessageBrokerArgumentResolver extends AbstractPayloadSendingArgumentResolver {
public MessageBrokerArgumentResolver(Reactor reactor, List<MessageConverter<?>> converters) {
super(reactor, converters);
}
@Override
public boolean supportsParameter(MethodParameter parameter) {
return MessageBroker.class.equals(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, StompMessage message, Object replyTo) throws Exception {
return new DefaultMessageBroker(message.getSessionId());
}
private class DefaultMessageBroker implements MessageBroker {
private final String sessionId;
public DefaultMessageBroker(String sessionId) {
this.sessionId = sessionId;
}
@Override
public void send(String destination, Object content, MediaType contentType)
throws IOException, ContentTypeNotSupportedException {
StompHeaders headers = new StompHeaders();
headers.setDestination(destination);
headers.setContentType(contentType);
byte[] payload = convertToPayload(content, contentType);
if (payload != null) {
// TODO: set content-length
}
StompMessage message = new StompMessage(StompCommand.SEND, headers, payload);
message.setSessionId(this.sessionId);
getReactor().notify(StompCommand.SEND, Event.wrap(message));
}
}
}

View File

@ -1,97 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.service.support;
import java.io.IOException;
import java.util.List;
import org.springframework.core.MethodParameter;
import org.springframework.http.MediaType;
import org.springframework.util.Assert;
import org.springframework.web.messaging.Subscription;
import org.springframework.web.messaging.converter.ContentTypeNotSupportedException;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.stomp.StompCommand;
import org.springframework.web.messaging.stomp.StompHeaders;
import org.springframework.web.messaging.stomp.StompMessage;
import reactor.core.Reactor;
import reactor.fn.Event;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class SubscriptionArgumentResolver extends AbstractPayloadSendingArgumentResolver {
public SubscriptionArgumentResolver(Reactor reactor, List<MessageConverter<?>> converters) {
super(reactor, converters);
}
@Override
public boolean supportsParameter(MethodParameter parameter) {
return Subscription.class.equals(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, StompMessage message, Object replyTo) throws Exception {
Assert.isTrue(StompCommand.SUBSCRIBE.equals(message.getCommand()), "Not a subscribe command");
return new DefaultSubscription(message.getHeaders().getDestination(), replyTo);
}
private class DefaultSubscription implements Subscription {
private final String destination;
private final Object replyTo;
public DefaultSubscription(String destination, Object replyTo) {
this.destination = destination;
this.replyTo = replyTo;
}
@Override
public void reply(Object content) throws IOException, ContentTypeNotSupportedException {
reply(content, null);
}
@Override
public void reply(Object content, MediaType contentType)
throws IOException, ContentTypeNotSupportedException {
StompHeaders headers = new StompHeaders();
headers.setDestination(this.destination);
headers.setContentType(contentType);
byte[] payload = convertToPayload(content, contentType);
if (payload != null) {
// TODO: set content-length
}
StompMessage message = new StompMessage(StompCommand.MESSAGE, headers, payload);
getReactor().notify(this.replyTo, Event.wrap(message));
}
}
}

View File

@ -24,18 +24,24 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.http.MediaType;
import org.springframework.messaging.GenericMessage;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.CollectionUtils;
import org.springframework.web.messaging.MessageType;
import org.springframework.web.messaging.converter.CompositeMessageConverter;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.event.EventBus;
import org.springframework.web.messaging.event.EventConsumer;
import org.springframework.web.messaging.event.EventRegistration;
import org.springframework.web.messaging.service.AbstractMessageService;
import org.springframework.web.messaging.stomp.StompCommand;
import org.springframework.web.messaging.stomp.StompException;
import org.springframework.web.messaging.stomp.StompHeaders;
import org.springframework.web.messaging.stomp.StompMessage;
import org.springframework.web.messaging.stomp.StompSession;
import reactor.Fn;
import reactor.core.Reactor;
import reactor.fn.Consumer;
import reactor.fn.Event;
import reactor.fn.registry.Registration;
import org.springframework.web.messaging.stomp.support.StompHeaderMapper;
/**
* @author Gary Russell
@ -47,60 +53,82 @@ public class DefaultStompWebSocketHandler extends AbstractStompWebSocketHandler
private static Log logger = LogFactory.getLog(DefaultStompWebSocketHandler.class);
private final Reactor reactor;
private final EventBus eventBus;
private Map<String, List<Registration<?>>> registrationsBySession =
new ConcurrentHashMap<String, List<Registration<?>>>();
private MessageConverter payloadConverter = new CompositeMessageConverter(null);
private final StompHeaderMapper headerMapper = new StompHeaderMapper();
private Map<String, List<EventRegistration>> registrationsBySession =
new ConcurrentHashMap<String, List<EventRegistration>>();
public DefaultStompWebSocketHandler(Reactor reactor) {
this.reactor = reactor;
public DefaultStompWebSocketHandler(EventBus eventBus) {
this.eventBus = eventBus;
}
public void handleStompMessage(StompSession session, StompMessage message) {
public void setMessageConverters(List<MessageConverter> converters) {
this.payloadConverter = new CompositeMessageConverter(converters);
}
public void handleStompMessage(final StompSession session, StompMessage stompMessage) {
if (logger.isTraceEnabled()) {
logger.trace("Processing: " + stompMessage);
}
try {
StompCommand command = message.getCommand();
MessageType messageType = MessageType.OTHER;
String replyKey = null;
StompCommand command = stompMessage.getCommand();
if (StompCommand.CONNECT.equals(command) || StompCommand.STOMP.equals(command)) {
registerConnectionClosedCallback(session);
handleConnect(session, message);
}
else if (StompCommand.SUBSCRIBE.equals(command)) {
handleSubscribe(session, message);
}
else if (StompCommand.UNSUBSCRIBE.equals(command)) {
handleUnsubscribe(session, message);
session.registerConnectionClosedTask(new ConnectionClosedTask(session));
messageType = MessageType.CONNECT;
replyKey = handleConnect(session, stompMessage);
}
else if (StompCommand.SEND.equals(command)) {
handleSend(session, message);
messageType = MessageType.MESSAGE;
handleSend(session, stompMessage);
}
else if (StompCommand.SUBSCRIBE.equals(command)) {
messageType = MessageType.SUBSCRIBE;
replyKey = handleSubscribe(session, stompMessage);
}
else if (StompCommand.UNSUBSCRIBE.equals(command)) {
messageType = MessageType.UNSUBSCRIBE;
handleUnsubscribe(session, stompMessage);
}
else if (StompCommand.DISCONNECT.equals(command)) {
handleDisconnect(session, message);
}
else if (StompCommand.ACK.equals(command) || StompCommand.NACK.equals(command)) {
this.reactor.notify(command, Event.wrap(message));
}
else if (StompCommand.BEGIN.equals(command) || StompCommand.COMMIT.equals(command) || StompCommand.ABORT.equals(command)) {
this.reactor.notify(command, Event.wrap(message));
messageType = MessageType.DISCONNECT;
handleDisconnect(session, stompMessage);
}
else {
sendErrorMessage(session, "Invalid STOMP command " + command);
return;
}
Map<String, Object> messageHeaders = this.headerMapper.toMessageHeaders(stompMessage.getHeaders());
messageHeaders.put("messageType", messageType);
if (replyKey != null) {
messageHeaders.put(MessageHeaders.REPLY_CHANNEL, replyKey);
}
messageHeaders.put("stompCommand", command);
messageHeaders.put("sessionId", session.getId());
Message<byte[]> genericMessage = new GenericMessage<byte[]>(stompMessage.getPayload(), messageHeaders);
if (logger.isTraceEnabled()) {
logger.trace("Sending notification: " + genericMessage);
}
this.eventBus.send(AbstractMessageService.MESSAGE_KEY, genericMessage);
}
catch (Throwable t) {
handleError(session, t);
}
}
private void registerConnectionClosedCallback(final StompSession session) {
session.registerConnectionClosedCallback(new Runnable() {
@Override
public void run() {
removeSubscriptions(session);
reactor.notify("CONNECTION_CLOSED", Event.wrap(session.getId()));
}
});
}
private void handleError(final StompSession session, Throwable t) {
logger.error("Terminating STOMP session due to failure to send message: ", t);
sendErrorMessage(session, t.getMessage());
@ -121,7 +149,7 @@ public class DefaultStompWebSocketHandler extends AbstractStompWebSocketHandler
}
}
protected void handleConnect(final StompSession session, StompMessage stompMessage) throws IOException {
protected String handleConnect(final StompSession session, StompMessage stompMessage) throws IOException {
StompHeaders headers = new StompHeaders();
Set<String> acceptVersions = stompMessage.getHeaders().getAcceptVersion();
@ -144,61 +172,75 @@ public class DefaultStompWebSocketHandler extends AbstractStompWebSocketHandler
session.sendMessage(new StompMessage(StompCommand.CONNECTED, headers));
String replyToKey = "relay-message" + session.getId();
String replyKey = "relay-message" + session.getId();
Registration<?> registration = this.reactor.on(Fn.$(replyToKey), new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
try {
StompMessage message = event.getData();
if (StompCommand.CONNECTED.equals(message.getCommand())) {
// TODO: skip for now (we already sent CONNECTED)
return;
EventRegistration registration = this.eventBus.registerConsumer(replyKey,
new EventConsumer<StompMessage>() {
@Override
public void accept(StompMessage message) {
try {
if (StompCommand.CONNECTED.equals(message.getCommand())) {
// TODO: skip for now (we already sent CONNECTED)
return;
}
if (logger.isTraceEnabled()) {
logger.trace("Relaying back to client: " + message);
}
session.sendMessage(message);
}
catch (Throwable t) {
handleError(session, t);
}
}
if (logger.isTraceEnabled()) {
logger.trace("Relaying back to client: " + message);
}
session.sendMessage(message);
}
catch (Throwable t) {
handleError(session, t);
}
}
});
});
addRegistration(session, registration);
addRegistration(session.getId(), registration);
this.reactor.notify(StompCommand.CONNECT, Event.wrap(stompMessage, replyToKey));
return replyKey;
}
protected void handleSubscribe(final StompSession session, StompMessage message) {
protected String handleSubscribe(final StompSession session, StompMessage message) {
final String subscriptionId = message.getHeaders().getId();
String replyToKey = getSubscriptionReplyKey(session, subscriptionId);
String replyKey = getSubscriptionReplyKey(session, subscriptionId);
// TODO: extract and remember "ack" mode
// http://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE_ack_Header
if (logger.isTraceEnabled()) {
logger.trace("Adding subscription, key=" + replyToKey);
logger.trace("Adding subscription, key=" + replyKey);
}
Registration<?> registration = this.reactor.on(Fn.$(replyToKey), new Consumer<Event<StompMessage>>() {
EventRegistration registration = this.eventBus.registerConsumer(replyKey, new EventConsumer<Message<?>>() {
@Override
public void accept(Event<StompMessage> event) {
event.getData().getHeaders().setSubscription(subscriptionId);
public void accept(Message<?> replyMessage) {
StompHeaders headers = new StompHeaders();
headers.setSubscription(subscriptionId);
headerMapper.fromMessageHeaders(replyMessage.getHeaders(), headers);
byte[] payload;
try {
session.sendMessage(event.getData());
MediaType contentType = headers.getContentType();
payload = payloadConverter.convertToPayload(replyMessage.getPayload(), contentType);
}
catch (Exception e) {
logger.error("Failed to send " + replyMessage, e);
return;
}
try {
StompMessage stompMessage = new StompMessage(StompCommand.MESSAGE, headers, payload);
session.sendMessage(stompMessage);
}
catch (Throwable t) {
handleError(session, t);
}
}
});
addRegistration(session, registration);
addRegistration(session.getId(), registration);
this.reactor.notify(StompCommand.SUBSCRIBE, Event.wrap(message, replyToKey));
return replyKey;
// TODO: need a way to communicate back if subscription was successfully created or
// not in which case an ERROR should be sent back and close the connection
@ -209,10 +251,11 @@ public class DefaultStompWebSocketHandler extends AbstractStompWebSocketHandler
return StompCommand.SUBSCRIBE + ":" + session.getId() + ":" + subscriptionId;
}
private void addRegistration(String sessionId, Registration<?> registration) {
List<Registration<?>> list = this.registrationsBySession.get(sessionId);
private void addRegistration(StompSession session, EventRegistration registration) {
String sessionId = session.getId();
List<EventRegistration> list = this.registrationsBySession.get(sessionId);
if (list == null) {
list = new ArrayList<Registration<?>>();
list = new ArrayList<EventRegistration>();
this.registrationsBySession.put(sessionId, list);
}
list.add(registration);
@ -220,14 +263,13 @@ public class DefaultStompWebSocketHandler extends AbstractStompWebSocketHandler
protected void handleUnsubscribe(StompSession session, StompMessage message) {
cancelRegistration(session, message.getHeaders().getId());
this.reactor.notify(StompCommand.UNSUBSCRIBE, Event.wrap(message));
}
private void cancelRegistration(StompSession session, String subscriptionId) {
String key = getSubscriptionReplyKey(session, subscriptionId);
List<Registration<?>> list = this.registrationsBySession.get(session.getId());
for (Registration<?> registration : list) {
if (registration.getSelector().matches(key)) {
List<EventRegistration> list = this.registrationsBySession.get(session.getId());
for (EventRegistration registration : list) {
if (registration.getRegistrationKey().equals(key)) {
if (logger.isDebugEnabled()) {
logger.debug("Cancelling subscription, key=" + key);
}
@ -238,27 +280,44 @@ public class DefaultStompWebSocketHandler extends AbstractStompWebSocketHandler
}
protected void handleSend(StompSession session, StompMessage stompMessage) {
this.reactor.notify(StompCommand.SEND, Event.wrap(stompMessage));
}
protected void handleDisconnect(StompSession session, StompMessage stompMessage) {
removeSubscriptions(session);
this.reactor.notify(StompCommand.DISCONNECT, Event.wrap(stompMessage));
}
private boolean removeSubscriptions(StompSession session) {
String sessionId = session.getId();
List<Registration<?>> registrations = this.registrationsBySession.remove(sessionId);
List<EventRegistration> registrations = this.registrationsBySession.remove(sessionId);
if (CollectionUtils.isEmpty(registrations)) {
return false;
}
if (logger.isTraceEnabled()) {
logger.trace("Cancelling " + registrations.size() + " subscriptions for session=" + sessionId);
}
for (Registration<?> registration : registrations) {
for (EventRegistration registration : registrations) {
registration.cancel();
}
return true;
}
private final class ConnectionClosedTask implements Runnable {
private final StompSession session;
private ConnectionClosedTask(StompSession session) {
this.session = session;
}
@Override
public void run() {
removeSubscriptions(session);
if (logger.isTraceEnabled()) {
logger.trace("Sending notification for closed connection: " + session.getId());
}
eventBus.send(AbstractMessageService.CLIENT_CONNECTION_CLOSED_KEY, session.getId());
}
}
}

View File

@ -74,7 +74,7 @@ public class WebSocketStompSession implements StompSession {
}
}
public void registerConnectionClosedCallback(Runnable task) {
public void registerConnectionClosedTask(Runnable task) {
this.connectionClosedTasks.add(task);
}

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.service;
package org.springframework.web.messaging.stomp.support;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@ -23,18 +23,23 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.net.SocketFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.http.MediaType;
import org.springframework.messaging.Message;
import org.springframework.web.messaging.converter.CompositeMessageConverter;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.event.EventBus;
import org.springframework.web.messaging.service.AbstractMessageService;
import org.springframework.web.messaging.stomp.StompCommand;
import org.springframework.web.messaging.stomp.StompHeaders;
import org.springframework.web.messaging.stomp.StompMessage;
import org.springframework.web.messaging.stomp.support.StompMessageConverter;
import reactor.core.Reactor;
import reactor.fn.Event;
import reactor.util.Assert;
@ -43,57 +48,77 @@ import reactor.util.Assert;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class RelayStompService extends AbstractStompService {
public class RelayStompService extends AbstractMessageService {
private Map<String, RelaySession> relaySessions = new ConcurrentHashMap<String, RelaySession>();
private final StompMessageConverter messageConverter = new StompMessageConverter();
private MessageConverter payloadConverter;
private final TaskExecutor taskExecutor;
private Map<String, RelaySession> relaySessions = new ConcurrentHashMap<String, RelaySession>();
public RelayStompService(Reactor reactor, TaskExecutor executor) {
super(reactor);
private final StompMessageConverter stompMessageConverter = new StompMessageConverter();
private final StompHeaderMapper stompHeaderMapper = new StompHeaderMapper();
public RelayStompService(EventBus eventBus, TaskExecutor executor) {
super(eventBus);
this.taskExecutor = executor; // For now, a naive way to manage socket reading
this.payloadConverter = new CompositeMessageConverter(null);
}
protected void processConnect(StompMessage stompMessage, final Object replyTo) {
public void setMessageConverters(List<MessageConverter> converters) {
this.payloadConverter = new CompositeMessageConverter(converters);
}
final String stompSessionId = stompMessage.getSessionId();
protected void processConnect(Message<?> message) {
final RelaySession session = new RelaySession();
this.relaySessions.put(stompSessionId, session);
String sessionId = (String) message.getHeaders().get("sessionId");
RelaySession session = new RelaySession();
this.relaySessions.put(sessionId, session);
try {
Socket socket = SocketFactory.getDefault().createSocket("127.0.0.1", 61613);
session.setSocket(socket);
relayStompMessage(stompMessage);
forwardMessage(message, StompCommand.CONNECT);
taskExecutor.execute(new RelayReadTask(stompSessionId, replyTo, session));
String replyTo = (String) message.getHeaders().getReplyChannel();
RelayReadTask readTask = new RelayReadTask(sessionId, replyTo, session);
this.taskExecutor.execute(readTask);
}
catch (Throwable t) {
t.printStackTrace();
clearRelaySession(stompSessionId);
clearRelaySession(sessionId);
}
}
private void relayStompMessage(StompMessage stompMessage) {
RelaySession session = RelayStompService.this.relaySessions.get(stompMessage.getSessionId());
private void forwardMessage(Message<?> message, StompCommand command) {
String sessionId = (String) message.getHeaders().get("sessionId");
RelaySession session = RelayStompService.this.relaySessions.get(sessionId);
Assert.notNull(session, "RelaySession not found");
try {
StompHeaders stompHeaders = new StompHeaders();
this.stompHeaderMapper.fromMessageHeaders(message.getHeaders(), stompHeaders);
MediaType contentType = stompHeaders.getContentType();
byte[] payload = this.payloadConverter.convertToPayload(message.getPayload(), contentType);
StompMessage stompMessage = new StompMessage(command, stompHeaders, payload);
if (logger.isTraceEnabled()) {
logger.trace("Forwarding: " + stompMessage);
}
byte[] bytes = messageConverter.fromStompMessage(stompMessage);
session.getOutputStream().write(bytes);
byte[] bytesToWrite = this.stompMessageConverter.fromStompMessage(stompMessage);
session.getOutputStream().write(bytesToWrite);
session.getOutputStream().flush();
}
catch (Exception e) {
e.printStackTrace();
clearRelaySession(stompMessage.getSessionId());
catch (Exception ex) {
logger.error("Couldn't forward message", ex);
clearRelaySession(sessionId);
}
}
@ -111,47 +136,34 @@ public class RelayStompService extends AbstractStompService {
}
@Override
protected void processSubscribe(StompMessage message, Object replyTo) {
relayStompMessage(message);
protected void processMessage(Message<?> message) {
forwardMessage(message, StompCommand.SEND);
}
@Override
protected void processSend(StompMessage message) {
relayStompMessage(message);
protected void processSubscribe(Message<?> message) {
forwardMessage(message, StompCommand.SUBSCRIBE);
}
@Override
protected void processDisconnect(StompMessage message) {
relayStompMessage(message);
protected void processUnsubscribe(Message<?> message) {
forwardMessage(message, StompCommand.UNSUBSCRIBE);
}
@Override
protected void processAck(StompMessage message) {
relayStompMessage(message);
protected void processDisconnect(Message<?> message) {
forwardMessage(message, StompCommand.DISCONNECT);
}
@Override
protected void processNack(StompMessage message) {
relayStompMessage(message);
protected void processOther(Message<?> message) {
StompCommand command = (StompCommand) message.getHeaders().get("stompCommand");
Assert.notNull(command, "Expected STOMP command: " + message.getHeaders());
forwardMessage(message, command);
}
@Override
protected void processBegin(StompMessage message) {
relayStompMessage(message);
}
@Override
protected void processCommit(StompMessage message) {
relayStompMessage(message);
}
@Override
protected void processAbort(StompMessage message) {
relayStompMessage(message);
}
@Override
protected void processConnectionClosed(String sessionId) {
protected void processClientConnectionClosed(String sessionId) {
if (logger.isDebugEnabled()) {
logger.debug("Client connection closed for STOMP session=" + sessionId + ". Clearing relay session.");
}
@ -190,10 +202,10 @@ public class RelayStompService extends AbstractStompService {
private final class RelayReadTask implements Runnable {
private final String stompSessionId;
private final Object replyTo;
private final String replyTo;
private final RelaySession session;
private RelayReadTask(String stompSessionId, Object replyTo, RelaySession session) {
private RelayReadTask(String stompSessionId, String replyTo, RelaySession session) {
this.stompSessionId = stompSessionId;
this.replyTo = replyTo;
this.session = session;
@ -210,8 +222,8 @@ public class RelayStompService extends AbstractStompService {
}
else if (b == 0x00) {
byte[] bytes = out.toByteArray();
StompMessage message = RelayStompService.this.messageConverter.toStompMessage(bytes);
getReactor().notify(replyTo, Event.wrap(message));
StompMessage message = RelayStompService.this.stompMessageConverter.toStompMessage(bytes);
getEventBus().send(this.replyTo, message);
out.reset();
}
else {
@ -231,7 +243,7 @@ public class RelayStompService extends AbstractStompService {
StompHeaders headers = new StompHeaders();
headers.setMessage(message);
StompMessage errorMessage = new StompMessage(StompCommand.ERROR, headers);
getReactor().notify(replyTo, Event.wrap(errorMessage));
getEventBus().send(this.replyTo, Event.wrap(errorMessage));
}
}

View File

@ -0,0 +1,102 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.support;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.http.MediaType;
import org.springframework.messaging.MessageHeaders;
import org.springframework.web.messaging.stomp.StompHeaders;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class StompHeaderMapper {
private static Log logger = LogFactory.getLog(StompHeaderMapper.class);
private static final String[][] stompHeaderNames;
static {
stompHeaderNames = new String[2][StompHeaders.STANDARD_HEADER_NAMES.size()];
for (int i=0 ; i < StompHeaders.STANDARD_HEADER_NAMES.size(); i++) {
stompHeaderNames[0][i] = StompHeaders.STANDARD_HEADER_NAMES.get(i);
stompHeaderNames[1][i] = "stomp." + StompHeaders.STANDARD_HEADER_NAMES.get(i);
}
}
public Map<String, Object> toMessageHeaders(StompHeaders stompHeaders) {
Map<String, Object> headers = new HashMap<String, Object>();
// prefixed STOMP headers
for (int i=0; i < stompHeaderNames[0].length; i++) {
String header = stompHeaderNames[0][i];
if (stompHeaders.containsKey(header)) {
String prefixedHeader = stompHeaderNames[1][i];
headers.put(prefixedHeader, stompHeaders.getFirst(header));
}
}
// for generic use (not-prefixed)
if (stompHeaders.getDestination() != null) {
headers.put("destination", stompHeaders.getDestination());
}
if (stompHeaders.getContentType() != null) {
headers.put("content-type", stompHeaders.getContentType());
}
return headers;
}
public void fromMessageHeaders(MessageHeaders messageHeaders, StompHeaders stompHeaders) {
// prefixed STOMP headers
for (int i=0; i < stompHeaderNames[0].length; i++) {
String prefixedHeader = stompHeaderNames[1][i];
if (messageHeaders.containsKey(prefixedHeader)) {
String header = stompHeaderNames[0][i];
stompHeaders.add(header, (String) messageHeaders.get(prefixedHeader));
}
}
// generic (not prefixed)
String destination = (String) messageHeaders.get("destination");
if (destination != null) {
stompHeaders.setDestination(destination);
}
Object contentType = messageHeaders.get("content-type");
if (contentType != null) {
if (contentType instanceof String) {
stompHeaders.setContentType(MediaType.valueOf((String) contentType));
}
else if (contentType instanceof MediaType) {
stompHeaders.setContentType((MediaType) contentType);
}
else {
logger.warn("Invalid contentType class: " + contentType.getClass());
}
}
}
}

View File

@ -51,7 +51,7 @@ public final class TextMessage extends WebSocketMessage<String> {
@Override
protected String toStringPayload() {
return (getPayloadSize() > 25) ? getPayload().substring(0, 25) + "..." : getPayload();
return (getPayloadSize() > 80) ? getPayload().substring(0, 80) + "..." : getPayload();
}
}

View File

@ -1,78 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.service;
import org.junit.Before;
import org.junit.Test;
import org.springframework.stereotype.Controller;
import org.springframework.web.context.support.StaticWebApplicationContext;
import org.springframework.web.messaging.MessageMapping;
import org.springframework.web.messaging.MessageType;
import org.springframework.web.method.HandlerMethod;
import reactor.core.Environment;
import reactor.core.Reactor;
import static org.junit.Assert.*;
/**
* Test fixture for {@link AnnotationStompService}.
* @author Rossen Stoyanchev
*/
public class AnnotationStompServiceTests {
private AnnotationStompService service;
@Before
public void setup() {
StaticWebApplicationContext wac = new StaticWebApplicationContext();
wac.registerSingleton("controller", TestController.class);
Reactor reactor = new Reactor(new Environment());
this.service = new AnnotationStompService(reactor);
this.service.setApplicationContext(wac);
this.service.afterPropertiesSet();
}
@Test
public void noMatch() {
assertNull(this.service.getHandlerMethod("/nomatch", MessageType.CONNECT));
}
@Test
public void match() {
HandlerMethod handlerMethod = this.service.getHandlerMethod("/foo", MessageType.SUBSCRIBE);
assertNotNull(handlerMethod);
assertEquals("handleSubscribe", handlerMethod.getMethod().getName());
assertEquals("controller", handlerMethod.getBean());
}
@Controller
private static class TestController {
@MessageMapping(value="/foo", messageType=MessageType.SUBSCRIBE)
public void handleSubscribe() {
}
}
}