Update MessageConverter and reactor dependencies

This commit is contained in:
Rossen Stoyanchev 2013-07-10 16:00:40 -04:00
parent dbc904b647
commit 2803845151
22 changed files with 205 additions and 972 deletions

View File

@ -301,7 +301,9 @@ project("spring-context") {
optional("org.hibernate:hibernate-validator:4.3.0.Final") optional("org.hibernate:hibernate-validator:4.3.0.Final")
optional("org.aspectj:aspectjweaver:${aspectjVersion}") optional("org.aspectj:aspectjweaver:${aspectjVersion}")
optional("org.apache.geronimo.specs:geronimo-jta_1.1_spec:1.1") optional("org.apache.geronimo.specs:geronimo-jta_1.1_spec:1.1")
optional("reactor:reactor-core:1.0.0.BUILD-SNAPSHOT") optional("com.fasterxml.jackson.core:jackson-databind:2.2.0")
optional("org.projectreactor:reactor-core:1.0.0.BUILD-SNAPSHOT")
optional("com.lmax:disruptor:3.1.1")
testCompile("commons-dbcp:commons-dbcp:1.2.2") testCompile("commons-dbcp:commons-dbcp:1.2.2")
testCompile("javax.inject:javax.inject-tck:1") testCompile("javax.inject:javax.inject-tck:1")
} }
@ -490,8 +492,9 @@ project("spring-websocket") {
optional("org.eclipse.jetty.websocket:websocket-server:9.0.4.v20130625") optional("org.eclipse.jetty.websocket:websocket-server:9.0.4.v20130625")
optional("org.eclipse.jetty.websocket:websocket-client:9.0.4.v20130625") optional("org.eclipse.jetty.websocket:websocket-client:9.0.4.v20130625")
optional("com.fasterxml.jackson.core:jackson-databind:2.2.0") // required for SockJS support currently optional("com.fasterxml.jackson.core:jackson-databind:2.2.0") // required for SockJS support currently
optional("reactor:reactor-core:1.0.0.BUILD-SNAPSHOT") // STOMP message processing optional("org.projectreactor:reactor-core:1.0.0.BUILD-SNAPSHOT")
optional("reactor:reactor-tcp:1.0.0.BUILD-SNAPSHOT") // STOMP relay to message broker optional("org.projectreactor:reactor-tcp:1.0.0.BUILD-SNAPSHOT")
optional("com.lmax:disruptor:3.1.1")
} }
repositories { repositories {

View File

@ -26,10 +26,10 @@ import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.SubscribableChannel;
import reactor.core.Reactor; import reactor.core.Reactor;
import reactor.fn.Consumer; import reactor.event.Event;
import reactor.fn.Event; import reactor.event.registry.Registration;
import reactor.fn.registry.Registration; import reactor.event.selector.ObjectSelector;
import reactor.fn.selector.ObjectSelector; import reactor.function.Consumer;
/** /**

View File

@ -0,0 +1,116 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.converter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.lang.reflect.Type;
import java.util.Map;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* @author Rossen Stoyanchev
* @sicne 4.0
*/
public class MappingJackson2MessageConverter implements MessageConverter<Object> {
private ObjectMapper objectMapper = new ObjectMapper();
private Type defaultObjectType = Map.class;
private Class<?> defaultMessagePayloadClass = byte[].class;
/**
* Set the default target Object class to convert to in
* {@link #fromMessage(Message, Class)}.
*/
public void setDefaultObjectClass(Type defaultObjectType) {
Assert.notNull(defaultObjectType, "defaultObjectType is required");
this.defaultObjectType = defaultObjectType;
}
/**
* Set the type of Message payload to convert to in {@link #toMessage(Object)}.
* @param payloadClass either byte[] or String
*/
public void setDefaultTargetPayloadClass(Class<?> payloadClass) {
Assert.isTrue(byte[].class.equals(payloadClass) || String.class.equals(payloadClass),
"Payload class must be byte[] or String: " + payloadClass);
this.defaultMessagePayloadClass = payloadClass;
}
@Override
public Object fromMessage(Message<?> message, Type objectType) {
JavaType javaType = (objectType != null) ?
this.objectMapper.constructType(objectType) :
this.objectMapper.constructType(this.defaultObjectType);
Object payload = message.getPayload();
try {
if (payload instanceof byte[]) {
return this.objectMapper.readValue((byte[]) payload, javaType);
}
else if (payload instanceof String) {
return this.objectMapper.readValue((String) payload, javaType);
}
else {
throw new IllegalArgumentException("Unexpected message payload type: " + payload);
}
}
catch (IOException ex) {
throw new MessageConversionException("Could not read JSON: " + ex.getMessage(), ex);
}
}
@SuppressWarnings("unchecked")
@Override
public <P> Message<P> toMessage(Object object) {
P payload;
try {
if (byte[].class.equals(this.defaultMessagePayloadClass)) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
this.objectMapper.writeValue(out, object);
payload = (P) out.toByteArray();
}
else if (String.class.equals(this.defaultMessagePayloadClass)) {
Writer writer = new StringWriter();
this.objectMapper.writeValue(writer, object);
payload = (P) writer.toString();
}
else {
// Should never happen..
throw new IllegalStateException("Unexpected payload class: " + defaultMessagePayloadClass);
}
}
catch (IOException ex) {
throw new MessageConversionException("Could not write JSON: " + ex.getMessage(), ex);
}
return MessageBuilder.withPayload(payload).build();
}
}

View File

@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.springframework.web.messaging.converter; package org.springframework.messaging.converter;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException; import org.springframework.messaging.MessagingException;

View File

@ -16,6 +16,8 @@
package org.springframework.messaging.converter; package org.springframework.messaging.converter;
import java.lang.reflect.Type;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
@ -23,10 +25,10 @@ import org.springframework.messaging.Message;
* @author Mark Fisher * @author Mark Fisher
* @since 4.0 * @since 4.0
*/ */
public interface MessageConverter { public interface MessageConverter<T> {
<T> Message<?> toMessage(T object); <P> Message<P> toMessage(T object);
<T> T fromMessage(Message<?> message); T fromMessage(Message<?> message, Type targetClass);
} }

View File

@ -15,6 +15,8 @@
*/ */
package org.springframework.messaging.converter; package org.springframework.messaging.converter;
import java.lang.reflect.Type;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
@ -23,19 +25,16 @@ import org.springframework.messaging.support.MessageBuilder;
* @author Mark Fisher * @author Mark Fisher
* @since 4.0 * @since 4.0
*/ */
public class DefaultMessageConverter implements MessageConverter { public class SimplePayloadMessageConverter implements MessageConverter<Object> {
@Override @Override
public <T> Message<?> toMessage(T object) { public Message<Object> toMessage(Object object) {
System.out.println("converting " + object + " to message");
return MessageBuilder.withPayload(object).build(); return MessageBuilder.withPayload(object).build();
} }
@Override @Override
@SuppressWarnings("unchecked") public Object fromMessage(Message<?> message, Type targetClass) {
public <T> T fromMessage(Message<?> message) { return message.getPayload();
System.out.println("converting " + message + " to object");
return (T) message.getPayload();
} }
} }

View File

@ -19,7 +19,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException; import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.DefaultMessageConverter; import org.springframework.messaging.converter.SimplePayloadMessageConverter;
import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -34,7 +34,7 @@ public abstract class AbstractMessageSendingTemplate<D> implements MessageSendin
private volatile D defaultDestination; private volatile D defaultDestination;
protected volatile MessageConverter converter = new DefaultMessageConverter(); protected volatile MessageConverter converter = new SimplePayloadMessageConverter();
public void setDefaultDestination(D defaultDestination) { public void setDefaultDestination(D defaultDestination) {
@ -44,7 +44,7 @@ public abstract class AbstractMessageSendingTemplate<D> implements MessageSendin
/** /**
* Set the {@link MessageConverter} that is to be used to convert * Set the {@link MessageConverter} that is to be used to convert
* between Messages and objects for this template. * between Messages and objects for this template.
* <p>The default is {@link DefaultMessageConverter}. * <p>The default is {@link SimplePayloadMessageConverter}.
*/ */
public void setMessageConverter(MessageConverter messageConverter) { public void setMessageConverter(MessageConverter messageConverter) {
Assert.notNull(messageConverter, "'messageConverter' must not be null"); Assert.notNull(messageConverter, "'messageConverter' must not be null");

View File

@ -46,8 +46,8 @@ public abstract class AbstractMessagingTemplate<D> extends AbstractMessageSendin
@Override @Override
public Object receiveAndConvert(D destination) { public Object receiveAndConvert(D destination) {
Message<Object> message = this.doReceive(destination); Message<?> message = this.doReceive(destination);
return (message != null) ? this.converter.fromMessage(message) : null; return (message != null) ? this.converter.fromMessage(message, null) : null;
} }
@ -86,7 +86,7 @@ public abstract class AbstractMessagingTemplate<D> extends AbstractMessageSendin
requestMessage = postProcessor.postProcessMessage(requestMessage); requestMessage = postProcessor.postProcessMessage(requestMessage);
} }
Message<?> replyMessage = this.sendAndReceive(destination, requestMessage); Message<?> replyMessage = this.sendAndReceive(destination, requestMessage);
return this.converter.fromMessage(replyMessage); return this.converter.fromMessage(replyMessage, null);
} }
} }

View File

@ -1,193 +0,0 @@
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.converter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.http.MediaType;
import org.springframework.util.Assert;
/**
* Abstract base class for most {@link MessageConverter} implementations.
*
* <p>This base class adds support for setting supported {@code MediaTypes}, through the
* {@link #setSupportedMediaTypes(List) supportedMediaTypes} property.
*
* @author Rossen Stoyanchev
* @author Arjen Poutsma
* @since 4.0
*/
public abstract class AbstractMessageConverter implements MessageConverter {
/** Logger available to subclasses */
protected final Log logger = LogFactory.getLog(getClass());
private List<MediaType> supportedMediaTypes = Collections.emptyList();
/**
* Construct an {@code AbstractMessageConverter} with no supported media types.
* @see #setSupportedMediaTypes
*/
protected AbstractMessageConverter() {
}
/**
* Construct an {@code AbstractMessageConverter} with one supported media type.
*
* @param supportedMediaType the supported media type
*/
protected AbstractMessageConverter(MediaType supportedMediaType) {
setSupportedMediaTypes(Collections.singletonList(supportedMediaType));
}
/**
* Construct an {@code AbstractMessageConverter} with multiple supported media type.
*
* @param supportedMediaTypes the supported media types
*/
protected AbstractMessageConverter(MediaType... supportedMediaTypes) {
setSupportedMediaTypes(Arrays.asList(supportedMediaTypes));
}
/**
* Set the list of {@link MediaType} objects supported by this converter.
*/
public void setSupportedMediaTypes(List<MediaType> supportedMediaTypes) {
Assert.notEmpty(supportedMediaTypes, "'supportedMediaTypes' must not be empty");
this.supportedMediaTypes = new ArrayList<MediaType>(supportedMediaTypes);
}
public List<MediaType> getSupportedMediaTypes() {
return Collections.unmodifiableList(this.supportedMediaTypes);
}
/**
* This implementation checks if the given class is {@linkplain #supports(Class)
* supported}, and if the {@linkplain #getSupportedMediaTypes() supported media types}
* {@linkplain MediaType#includes(MediaType) include} the given media type.
*/
@Override
public boolean canConvertFromPayload(Class<?> clazz, MediaType mediaType) {
return supports(clazz) && canConvertFrom(mediaType);
}
/**
* Indicates whether the given class is supported by this converter.
* @param clazz the class to test for support
* @return {@code true} if supported; {@code false} otherwise
*/
protected abstract boolean supports(Class<?> clazz);
/**
* Returns true if any of the {@linkplain #setSupportedMediaTypes(List) supported
* media types} include the given media type.
*
* @param mediaType the media type to read, can be {@code null} if not specified.
* Typically the value of a {@code Content-Type} header.
* @return {@code true} if the supported media types include the media type, or if the
* media type is {@code null}
*/
protected boolean canConvertFrom(MediaType mediaType) {
if (mediaType == null) {
return true;
}
for (MediaType supportedMediaType : getSupportedMediaTypes()) {
if (supportedMediaType.includes(mediaType)) {
return true;
}
}
return false;
}
/**
* This implementation checks if the given class is {@linkplain #supports(Class)
* supported}, and if the {@linkplain #getSupportedMediaTypes() supported media types}
* {@linkplain MediaType#includes(MediaType) include} the given media type.
*/
@Override
public boolean canConvertToPayload(Class<?> clazz, MediaType mediaType) {
return supports(clazz) && canConvertTo(mediaType);
}
/**
* Returns {@code true} if the given media type includes any of the
* {@linkplain #setSupportedMediaTypes(List) supported media types}.
*
* @param mediaType the media type to write, can be {@code null} if not specified.
* Typically the value of an {@code Accept} header.
* @return {@code true} if the supported media types are compatible with the media
* type, or if the media type is {@code null}
*/
protected boolean canConvertTo(MediaType mediaType) {
if (mediaType == null || MediaType.ALL.equals(mediaType)) {
return true;
}
for (MediaType supportedMediaType : getSupportedMediaTypes()) {
if (supportedMediaType.isCompatibleWith(mediaType)) {
return true;
}
}
return false;
}
/**
* This implementation simply delegates to
* {@link #convertFromPayloadInternal(Class, MediaType, byte[])}. Future
* implementations might add some default behavior, however.
*/
@Override
public Object convertFromPayload(Class<?> clazz, MediaType contentType, byte[] payload)
throws IOException, ContentTypeNotSupportedException {
return convertFromPayloadInternal(clazz, contentType, payload);
}
/**
* Abstract template method that reads the actual object. Invoked from {@link #read}.
* @param clazz the type of object to return
* @param contentType
* @param payload the content to convert from
* @return the converted object
* @throws IOException in case of I/O errors
*/
protected abstract Object convertFromPayloadInternal(Class<?> clazz, MediaType contentType, byte[] payload)
throws IOException, ContentTypeNotSupportedException;
/**
* This implementation simply delegates to
* {@link #convertToPayloadInternal(Object, MediaType)}. Future
* implementations might add some default behavior, however.
*/
@Override
public byte[] convertToPayload(Object content, MediaType contentType)
throws IOException, ContentTypeNotSupportedException {
return convertToPayloadInternal(content, contentType);
}
protected abstract byte[] convertToPayloadInternal(Object content, MediaType contentType)
throws IOException, ContentTypeNotSupportedException;
}

View File

@ -1,48 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.converter;
import org.springframework.http.MediaType;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class ByteArrayMessageConverter implements MessageConverter {
@Override
public boolean canConvertFromPayload(Class<?> clazz, MediaType contentType) {
return byte[].class.equals(clazz);
}
@Override
public Object convertFromPayload(Class<?> clazz, MediaType contentType, byte[] payload) {
return payload;
}
@Override
public boolean canConvertToPayload(Class<?> clazz, MediaType mediaType) {
return byte[].class.equals(clazz);
}
@Override
public byte[] convertToPayload(Object content, MediaType contentType) {
return (byte[]) content;
}
}

View File

@ -1,99 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.converter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.springframework.http.MediaType;
import org.springframework.util.ClassUtils;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class CompositeMessageConverter implements MessageConverter {
private static final boolean jackson2Present =
ClassUtils.isPresent("com.fasterxml.jackson.databind.ObjectMapper", CompositeMessageConverter.class.getClassLoader()) &&
ClassUtils.isPresent("com.fasterxml.jackson.core.JsonGenerator", CompositeMessageConverter.class.getClassLoader());
private final List<MessageConverter> converters;
public CompositeMessageConverter(List<MessageConverter> converters) {
if (converters == null) {
this.converters = new ArrayList<MessageConverter>();
this.converters.add(new ByteArrayMessageConverter());
this.converters.add(new StringMessageConverter());
if (jackson2Present) {
this.converters.add(new MappingJackson2MessageConverter());
}
}
else {
this.converters = converters;
}
}
@Override
public boolean canConvertFromPayload(Class<?> clazz, MediaType contentType) {
for (MessageConverter converter : this.converters) {
if (converter.canConvertFromPayload(clazz, contentType)) {
return true;
}
}
return false;
}
@Override
public Object convertFromPayload(Class<?> clazz, MediaType contentType, byte[] payload)
throws IOException, ContentTypeNotSupportedException {
for (MessageConverter converter : this.converters) {
if (converter.canConvertFromPayload(clazz, contentType)) {
return converter.convertFromPayload(clazz, contentType, payload);
}
}
throw new ContentTypeNotSupportedException(contentType, clazz);
}
@Override
public boolean canConvertToPayload(Class<?> clazz, MediaType mediaType) {
for (MessageConverter converter : this.converters) {
if (converter.canConvertToPayload(clazz, mediaType)) {
return true;
}
}
return false;
}
@Override
public byte[] convertToPayload(Object content, MediaType contentType)
throws IOException, ContentTypeNotSupportedException {
for (MessageConverter converter : this.converters) {
if (converter.canConvertToPayload(content.getClass(), contentType)) {
return converter.convertToPayload(content, contentType);
}
}
throw new ContentTypeNotSupportedException(contentType, content.getClass());
}
}

View File

@ -1,49 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.converter;
import org.springframework.http.MediaType;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class ContentTypeNotSupportedException extends Exception {
private static final long serialVersionUID = -3597879520747071896L;
private final MediaType mediaType;
private final Class<?> sourceOrTargetType;
public ContentTypeNotSupportedException(MediaType mediaType, Class<?> sourceOrTargetType) {
super("Content type '" + mediaType + "' not supported");
this.mediaType = mediaType;
this.sourceOrTargetType = sourceOrTargetType;
}
public MediaType getMediaType() {
return this.mediaType;
}
public Class<?> getSourceOrTargetType() {
return this.sourceOrTargetType;
}
}

View File

@ -1,215 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.converter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.reflect.Type;
import org.springframework.http.MediaType;
import org.springframework.http.converter.HttpMessageNotReadableException;
import org.springframework.util.Assert;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
/**
* Implementation of {@link MessageConverter} that can read and write JSON using <a
* href="http://jackson.codehaus.org/">Jackson 2.x's</a> {@link ObjectMapper}.
* <p>
* This converter can be used to bind to typed beans, or untyped {@link java.util.HashMap
* HashMap} instances.
* <p>
* By default, this converter supports {@code application/json}. This can be overridden by
* setting the {@link #setSupportedMediaTypes supportedMediaTypes} property.
* <p>
* Tested against Jackson 2.2; compatible with Jackson 2.0 and higher.
*
* @author Rossen Stoyanchev
* @author Arjen Poutsma
* @since 4.0
*/
public class MappingJackson2MessageConverter extends AbstractMessageConverter {
private ObjectMapper objectMapper = new ObjectMapper();
private boolean prefixJson = false;
private Boolean prettyPrint;
/**
* Construct a new {@code MappingJackson2HttpMessageConverter}.
*/
public MappingJackson2MessageConverter() {
super(new MediaType("application", "json"), new MediaType("application", "*+json"));
}
/**
* Set the {@code ObjectMapper} for this view. If not set, a default
* {@link ObjectMapper#ObjectMapper() ObjectMapper} is used.
* <p>
* Setting a custom-configured {@code ObjectMapper} is one way to take further control
* of the JSON serialization process. For example, an extended
* {@link org.codehaus.jackson.map.SerializerFactory} can be configured that provides
* custom serializers for specific types. The other option for refining the
* serialization process is to use Jackson's provided annotations on the types to be
* serialized, in which case a custom-configured ObjectMapper is unnecessary.
*/
public void setObjectMapper(ObjectMapper objectMapper) {
Assert.notNull(objectMapper, "ObjectMapper must not be null");
this.objectMapper = objectMapper;
configurePrettyPrint();
}
/**
* Return the underlying {@code ObjectMapper} for this view.
*/
public ObjectMapper getObjectMapper() {
return this.objectMapper;
}
/**
* Indicate whether the JSON output by this view should be prefixed with "{} &&".
* Default is false.
* <p>
* Prefixing the JSON string in this manner is used to help prevent JSON Hijacking.
* The prefix renders the string syntactically invalid as a script so that it cannot
* be hijacked. This prefix does not affect the evaluation of JSON, but if JSON
* validation is performed on the string, the prefix would need to be ignored.
*/
public void setPrefixJson(boolean prefixJson) {
this.prefixJson = prefixJson;
}
/**
* Whether to use the {@link DefaultPrettyPrinter} when writing JSON.
* This is a shortcut for setting up an {@code ObjectMapper} as follows:
* <pre class="code">
* ObjectMapper mapper = new ObjectMapper();
* mapper.configure(SerializationFeature.INDENT_OUTPUT, true);
* converter.setObjectMapper(mapper);
* </pre>
*/
public void setPrettyPrint(boolean prettyPrint) {
this.prettyPrint = prettyPrint;
configurePrettyPrint();
}
private void configurePrettyPrint() {
if (this.prettyPrint != null) {
this.objectMapper.configure(SerializationFeature.INDENT_OUTPUT, this.prettyPrint);
}
}
@Override
public boolean canConvertFromPayload(Class<?> clazz, MediaType mediaType) {
JavaType javaType = getJavaType(clazz, null);
return (this.objectMapper.canDeserialize(javaType) && canConvertFrom(mediaType));
}
@Override
public boolean canConvertToPayload(Class<?> clazz, MediaType mediaType) {
return (this.objectMapper.canSerialize(clazz) && canConvertTo(mediaType));
}
@Override
protected boolean supports(Class<?> clazz) {
// should not be called, since we override canRead/Write instead
throw new UnsupportedOperationException();
}
@Override
protected Object convertFromPayloadInternal(Class<? extends Object> clazz,
MediaType contentType, byte[] payload) throws IOException {
JavaType javaType = getJavaType(clazz, null);
return readJavaType(javaType, payload);
}
private Object readJavaType(JavaType javaType, byte[] payload) {
try {
return this.objectMapper.readValue(payload, javaType);
}
catch (IOException ex) {
throw new HttpMessageNotReadableException("Could not read JSON: " + ex.getMessage(), ex);
}
}
@Override
protected byte[] convertToPayloadInternal(Object object, MediaType contentType) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
// The following has been deprecated as late as Jackson 2.2 (April 2013);
// preserved for the time being, for Jackson 2.0/2.1 compatibility.
@SuppressWarnings("deprecation")
JsonGenerator jsonGenerator =
this.objectMapper.getJsonFactory().createJsonGenerator(out, JsonEncoding.UTF8);
// A workaround for JsonGenerators not applying serialization features
// https://github.com/FasterXML/jackson-databind/issues/12
if (this.objectMapper.isEnabled(SerializationFeature.INDENT_OUTPUT)) {
jsonGenerator.useDefaultPrettyPrinter();
}
try {
if (this.prefixJson) {
jsonGenerator.writeRaw("{} && ");
}
this.objectMapper.writeValue(jsonGenerator, object);
}
catch (JsonProcessingException ex) {
// TODO: more specific exception
throw new IllegalStateException("Could not write JSON: " + ex.getMessage(), ex);
}
return out.toByteArray();
}
/**
* Return the Jackson {@link JavaType} for the specified type and context class.
* <p>The default implementation returns {@code typeFactory.constructType(type, contextClass)},
* but this can be overridden in subclasses, to allow for custom generic collection handling.
* For instance:
* <pre class="code">
* protected JavaType getJavaType(Type type) {
* if (type instanceof Class && List.class.isAssignableFrom((Class)type)) {
* return TypeFactory.collectionType(ArrayList.class, MyBean.class);
* } else {
* return super.getJavaType(type);
* }
* }
* </pre>
* @param type the type to return the java type for
* @param contextClass a context class for the target type, for example a class
* in which the target type appears in a method signature, can be {@code null}
* signature, can be {@code null}
* @return the java type
*/
protected JavaType getJavaType(Type type, Class<?> contextClass) {
return this.objectMapper.getTypeFactory().constructType(type, contextClass);
}
}

View File

@ -1,77 +0,0 @@
/*
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.converter;
import java.io.IOException;
import org.springframework.http.MediaType;
/**
* Strategy for converting a byte array message payload to and from a typed object.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface MessageConverter {
/**
* Whether instances of the given class can be converted to from a byte array.
*
* @param clazz the class to convert from
* @param mediaType the media type of the content, can be {@code null} if not
* specified. Typically the value of a {@code Content-Type} header.
* @return {@code true} if it can be converted; {@code false} otherwise
*/
boolean canConvertFromPayload(Class<?> clazz, MediaType mediaType);
/**
* Convert the byte array payload to the given type.
*
* @param clazz the type of object to return. This type must have previously been
* passed to {@link #canConvertFromPayload(Class, MediaType)} and it must have
* returned {@code true}.
* @param contentType the content type of the payload, can be {@code null}
* @param payload the payload to convert from
* @return the converted object
* @throws IOException in case of I/O errors
*/
Object convertFromPayload(Class<?> clazz, MediaType contentType, byte[] payload)
throws IOException, ContentTypeNotSupportedException;
/**
* Whether instances of the given class can be converted to a byte array.
*
* @param clazz the class to test
* @param mediaType the media type of the content, can be {@code null} if not specified.
* @return {@code true} if writable; {@code false} otherwise
*/
boolean canConvertToPayload(Class<?> clazz, MediaType mediaType);
/**
* Convert the given object to a byte array.
*
* @param t the object to convert. The type of this object must have previously been
* passed to {@link #canConvertToPayload(Class, MediaType)} and it must have returned
* {@code true}.
* @param headers
* @return the output message
* @throws IOException in case of I/O errors
*/
byte[] convertToPayload(Object content, MediaType contentType) throws IOException, ContentTypeNotSupportedException;
}

View File

@ -1,55 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.converter;
import java.io.IOException;
import java.nio.charset.Charset;
import org.springframework.http.MediaType;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class StringMessageConverter extends AbstractMessageConverter {
private static final Charset UTF_8 = Charset.forName("UTF-8");
public StringMessageConverter() {
super(MediaType.TEXT_PLAIN);
}
@Override
protected boolean supports(Class<?> clazz) {
return String.class.equals(clazz);
}
@Override
protected String convertFromPayloadInternal(Class<?> clazz, MediaType contentType, byte[] payload)
throws IOException {
return new String(payload, UTF_8);
}
@Override
protected byte[] convertToPayloadInternal(Object content, MediaType contentType) throws IOException {
return ((String) content).getBytes(UTF_8);
}
}

View File

@ -35,6 +35,7 @@ import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.annotation.MessageMapping; import org.springframework.messaging.annotation.MessageMapping;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.stereotype.Controller; import org.springframework.stereotype.Controller;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.ClassUtils; import org.springframework.util.ClassUtils;
@ -42,7 +43,6 @@ import org.springframework.util.ReflectionUtils.MethodFilter;
import org.springframework.web.messaging.MessageType; import org.springframework.web.messaging.MessageType;
import org.springframework.web.messaging.annotation.SubscribeEvent; import org.springframework.web.messaging.annotation.SubscribeEvent;
import org.springframework.web.messaging.annotation.UnsubscribeEvent; import org.springframework.web.messaging.annotation.UnsubscribeEvent;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.service.AbstractWebMessageHandler; import org.springframework.web.messaging.service.AbstractWebMessageHandler;
import org.springframework.web.messaging.support.MessageHolder; import org.springframework.web.messaging.support.MessageHolder;
import org.springframework.web.messaging.support.WebMessageHeaderAccesssor; import org.springframework.web.messaging.support.WebMessageHeaderAccesssor;
@ -61,7 +61,7 @@ public class AnnotationWebMessageHandler extends AbstractWebMessageHandler
private final MessageChannel outboundChannel; private final MessageChannel outboundChannel;
private List<MessageConverter> messageConverters; private MessageConverter<?> messageConverter;
private ApplicationContext applicationContext; private ApplicationContext applicationContext;
@ -90,8 +90,11 @@ public class AnnotationWebMessageHandler extends AbstractWebMessageHandler
this.outboundChannel = outboundChannel; this.outboundChannel = outboundChannel;
} }
public void setMessageConverters(List<MessageConverter> converters) { /**
this.messageConverters = converters; * TODO: multiple converters with 'content-type' header
*/
public void setMessageConverter(MessageConverter<?> converter) {
this.messageConverter = converter;
} }
@Override @Override
@ -109,11 +112,10 @@ public class AnnotationWebMessageHandler extends AbstractWebMessageHandler
initHandlerMethods(); initHandlerMethods();
this.argumentResolvers.addResolver(new MessageChannelArgumentResolver(this.inboundChannel)); this.argumentResolvers.addResolver(new MessageBodyArgumentResolver(this.messageConverter));
this.argumentResolvers.addResolver(new MessageBodyArgumentResolver(this.messageConverters));
this.returnValueHandlers.addHandler(new MessageReturnValueHandler(this.outboundChannel)); this.returnValueHandlers.addHandler(
this.returnValueHandlers.addHandler(new PayloadReturnValueHandler(this.outboundChannel)); new MessageSendingReturnValueHandler(this.outboundChannel, this.messageConverter));
} }
protected void initHandlerMethods() { protected void initHandlerMethods() {

View File

@ -16,16 +16,11 @@
package org.springframework.web.messaging.service.method; package org.springframework.web.messaging.service.method;
import java.util.List;
import org.springframework.core.MethodParameter; import org.springframework.core.MethodParameter;
import org.springframework.http.MediaType;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.Assert;
import org.springframework.web.messaging.annotation.MessageBody; import org.springframework.web.messaging.annotation.MessageBody;
import org.springframework.web.messaging.converter.CompositeMessageConverter;
import org.springframework.web.messaging.converter.MessageConversionException;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.support.WebMessageHeaderAccesssor;
/** /**
@ -34,11 +29,12 @@ import org.springframework.web.messaging.support.WebMessageHeaderAccesssor;
*/ */
public class MessageBodyArgumentResolver implements ArgumentResolver { public class MessageBodyArgumentResolver implements ArgumentResolver {
private final MessageConverter converter; private final MessageConverter<?> converter;
public MessageBodyArgumentResolver(List<MessageConverter> converters) { public MessageBodyArgumentResolver(MessageConverter<?> converter) {
this.converter = new CompositeMessageConverter(converters); Assert.notNull(converter, "converter is required");
this.converter = converter;
} }
@Override @Override
@ -52,19 +48,16 @@ public class MessageBodyArgumentResolver implements ArgumentResolver {
Object arg = null; Object arg = null;
MessageBody annot = parameter.getParameterAnnotation(MessageBody.class); MessageBody annot = parameter.getParameterAnnotation(MessageBody.class);
MediaType contentType = (MediaType) message.getHeaders().get(WebMessageHeaderAccesssor.CONTENT_TYPE);
if (annot == null || annot.required()) { if (annot == null || annot.required()) {
Class<?> sourceType = message.getPayload().getClass(); Class<?> sourceClass = message.getPayload().getClass();
Class<?> parameterType = parameter.getParameterType(); Class<?> targetClass = parameter.getParameterType();
if (parameterType.isAssignableFrom(sourceType)) { if (targetClass.isAssignableFrom(sourceClass)) {
return message.getPayload(); return message.getPayload();
} }
else if (byte[].class.equals(sourceType)) {
return this.converter.convertFromPayload(parameterType, contentType, (byte[]) message.getPayload());
}
else { else {
throw new MessageConversionException(message, "Unexpected payload type", null); // TODO: use content-type header
return this.converter.fromMessage(message, targetClass);
} }
} }

View File

@ -1,49 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.service.method;
import org.springframework.core.MethodParameter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class MessageChannelArgumentResolver implements ArgumentResolver {
private MessageChannel inboundChannel;
public MessageChannelArgumentResolver(MessageChannel inboundChannel) {
Assert.notNull(inboundChannel, "inboundChannel is required");
this.inboundChannel = inboundChannel;
}
@Override
public boolean supportsParameter(MethodParameter parameter) {
return MessageChannel.class.equals(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception {
return this.inboundChannel;
}
}

View File

@ -19,6 +19,7 @@ package org.springframework.web.messaging.service.method;
import org.springframework.core.MethodParameter; import org.springframework.core.MethodParameter;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.web.messaging.support.WebMessageHeaderAccesssor; import org.springframework.web.messaging.support.WebMessageHeaderAccesssor;
@ -28,47 +29,50 @@ import org.springframework.web.messaging.support.WebMessageHeaderAccesssor;
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 4.0 * @since 4.0
*/ */
public class MessageReturnValueHandler implements ReturnValueHandler { public class MessageSendingReturnValueHandler implements ReturnValueHandler {
private MessageChannel outboundChannel; private MessageChannel outboundChannel;
private final MessageConverter converter;
public MessageReturnValueHandler(MessageChannel outboundChannel) {
public MessageSendingReturnValueHandler(MessageChannel outboundChannel, MessageConverter<?> converter) {
Assert.notNull(outboundChannel, "outboundChannel is required"); Assert.notNull(outboundChannel, "outboundChannel is required");
Assert.notNull(converter, "converter is required");
this.outboundChannel = outboundChannel; this.outboundChannel = outboundChannel;
this.converter = converter;
} }
@Override @Override
public boolean supportsReturnType(MethodParameter returnType) { public boolean supportsReturnType(MethodParameter returnType) {
// TODO: List<Message> return value return true;
Class<?> paramType = returnType.getParameterType();
return Message.class.isAssignableFrom(paramType);
} }
@SuppressWarnings("unchecked")
@Override @Override
public void handleReturnValue(Object returnValue, MethodParameter returnType, Message<?> message) public void handleReturnValue(Object returnValue, MethodParameter returnType, Message<?> message)
throws Exception { throws Exception {
Assert.notNull(this.outboundChannel, "No clientChannel to send messages to"); if (returnValue == null) {
Message<?> returnMessage = (Message<?>) returnValue;
if (message == null) {
return; return;
} }
WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.wrap(message); WebMessageHeaderAccesssor inputHeaders = WebMessageHeaderAccesssor.wrap(message);
Assert.notNull(headers.getSubscriptionId(), "No subscription id: " + message); Message<?> returnMessage = (returnValue instanceof Message) ? (Message<?>) returnValue : null;
Object returnPayload = (returnMessage != null) ? returnMessage.getPayload() : returnValue;
WebMessageHeaderAccesssor returnHeaders = WebMessageHeaderAccesssor.wrap(returnMessage); WebMessageHeaderAccesssor returnHeaders = (returnMessage != null) ?
returnHeaders.setSessionId(headers.getSessionId()); WebMessageHeaderAccesssor.wrap(returnMessage) : WebMessageHeaderAccesssor.create();
returnHeaders.setSubscriptionId(headers.getSubscriptionId());
returnHeaders.setSessionId(inputHeaders.getSessionId());
returnHeaders.setSubscriptionId(inputHeaders.getSubscriptionId());
if (returnHeaders.getDestination() == null) { if (returnHeaders.getDestination() == null) {
returnHeaders.setDestination(headers.getDestination()); returnHeaders.setDestination(inputHeaders.getDestination());
} }
returnMessage = MessageBuilder.withPayload( returnMessage = this.converter.toMessage(returnPayload);
returnMessage.getPayload()).copyHeaders(returnHeaders.toMap()).build(); returnMessage = MessageBuilder.fromMessage(returnMessage).copyHeaders(returnHeaders.toMap()).build();
this.outboundChannel.send(returnMessage); this.outboundChannel.send(returnMessage);
} }

View File

@ -1,69 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.service.method;
import org.springframework.core.MethodParameter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.web.messaging.support.WebMessageHeaderAccesssor;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class PayloadReturnValueHandler implements ReturnValueHandler {
private MessageChannel outboundChannel;
public PayloadReturnValueHandler(MessageChannel outboundChannel) {
Assert.notNull(outboundChannel, "outboundChannel is required");
this.outboundChannel = outboundChannel;
}
@Override
public boolean supportsReturnType(MethodParameter returnType) {
return true;
}
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType, Message<?> message)
throws Exception {
Assert.notNull(this.outboundChannel, "No outboundChannel to send messages to");
if (returnValue == null) {
return;
}
WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.wrap(message);
WebMessageHeaderAccesssor returnHeaders = WebMessageHeaderAccesssor.create();
returnHeaders.setDestination(headers.getDestination());
returnHeaders.setSessionId(headers.getSessionId());
returnHeaders.setSubscriptionId(headers.getSubscriptionId());
Message<?> returnMessage = MessageBuilder.withPayload(
returnValue).copyHeaders(returnHeaders.toMap()).build();
this.outboundChannel.send(returnMessage);
}
}

View File

@ -27,27 +27,25 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.springframework.context.SmartLifecycle; import org.springframework.context.SmartLifecycle;
import org.springframework.http.MediaType;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.springframework.web.messaging.MessageType; import org.springframework.web.messaging.MessageType;
import org.springframework.web.messaging.converter.CompositeMessageConverter;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.service.AbstractWebMessageHandler; import org.springframework.web.messaging.service.AbstractWebMessageHandler;
import org.springframework.web.messaging.stomp.StompCommand; import org.springframework.web.messaging.stomp.StompCommand;
import org.springframework.web.messaging.support.WebMessageHeaderAccesssor; import org.springframework.web.messaging.support.WebMessageHeaderAccesssor;
import reactor.core.Environment; import reactor.core.Environment;
import reactor.core.Promise; import reactor.core.composable.Promise;
import reactor.fn.Consumer; import reactor.function.Consumer;
import reactor.tcp.TcpClient; import reactor.tcp.TcpClient;
import reactor.tcp.TcpConnection; import reactor.tcp.TcpConnection;
import reactor.tcp.encoding.DelimitedCodec; import reactor.tcp.encoding.DelimitedCodec;
import reactor.tcp.encoding.StandardCodecs; import reactor.tcp.encoding.StandardCodecs;
import reactor.tcp.netty.NettyTcpClient; import reactor.tcp.netty.NettyTcpClient;
import reactor.tcp.spec.TcpClientSpec;
/** /**
@ -71,8 +69,6 @@ public class StompRelayWebMessageHandler extends AbstractWebMessageHandler imple
private final StompMessageConverter stompMessageConverter = new StompMessageConverter(); private final StompMessageConverter stompMessageConverter = new StompMessageConverter();
private MessageConverter payloadConverter;
private Environment environment; private Environment environment;
private TcpClient<String, String> tcpClient; private TcpClient<String, String> tcpClient;
@ -90,7 +86,6 @@ public class StompRelayWebMessageHandler extends AbstractWebMessageHandler imple
public StompRelayWebMessageHandler(MessageChannel outboundChannel) { public StompRelayWebMessageHandler(MessageChannel outboundChannel) {
Assert.notNull(outboundChannel, "outboundChannel is required"); Assert.notNull(outboundChannel, "outboundChannel is required");
this.outboundChannel = outboundChannel; this.outboundChannel = outboundChannel;
this.payloadConverter = new CompositeMessageConverter(null);
} }
@ -154,10 +149,6 @@ public class StompRelayWebMessageHandler extends AbstractWebMessageHandler imple
return this.systemPasscode; return this.systemPasscode;
} }
public void setMessageConverters(List<MessageConverter> converters) {
this.payloadConverter = new CompositeMessageConverter(converters);
}
@Override @Override
protected Collection<MessageType> getSupportedMessageTypes() { protected Collection<MessageType> getSupportedMessageTypes() {
return null; return null;
@ -183,9 +174,10 @@ public class StompRelayWebMessageHandler extends AbstractWebMessageHandler imple
@Override @Override
public void start() { public void start() {
synchronized (this.lifecycleMonitor) { synchronized (this.lifecycleMonitor) {
this.environment = new Environment(); this.environment = new Environment();
this.tcpClient = new TcpClient.Spec<String, String>(NettyTcpClient.class) this.tcpClient = new TcpClientSpec<String, String>(NettyTcpClient.class)
.using(this.environment) .env(this.environment)
.codec(new DelimitedCodec<String, String>((byte) 0, true, StandardCodecs.STRING_CODEC)) .codec(new DelimitedCodec<String, String>((byte) 0, true, StandardCodecs.STRING_CODEC))
.connect(this.relayHost, this.relayPort) .connect(this.relayHost, this.relayPort)
.get(); .get();
@ -284,10 +276,6 @@ public class StompRelayWebMessageHandler extends AbstractWebMessageHandler imple
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
headers.setStompCommandIfNotSet(command); headers.setStompCommandIfNotSet(command);
if (headers.getSessionId() == null && (StompCommand.SEND.equals(command))) {
}
String sessionId = headers.getSessionId(); String sessionId = headers.getSessionId();
if (sessionId == null) { if (sessionId == null) {
if (StompCommand.SEND.equals(command)) { if (StompCommand.SEND.equals(command)) {
@ -301,8 +289,7 @@ public class StompRelayWebMessageHandler extends AbstractWebMessageHandler imple
RelaySession session = this.relaySessions.get(sessionId); RelaySession session = this.relaySessions.get(sessionId);
if (session == null) { if (session == null) {
// TODO: default (non-user) session for sending messages? logger.warn("Session id=" + sessionId + " not found. Message cannot be forwarded: " + message);
logger.warn("No relay session for " + sessionId + ". Message '" + message + "' cannot be forwarded");
return; return;
} }
@ -438,16 +425,12 @@ public class StompRelayWebMessageHandler extends AbstractWebMessageHandler imple
try { try {
headers.setStompCommandIfNotSet(StompCommand.SEND); headers.setStompCommandIfNotSet(StompCommand.SEND);
MediaType contentType = headers.getContentType();
byte[] payload = payloadConverter.convertToPayload(message.getPayload(), contentType);
Message<?> byteMessage = MessageBuilder.withPayload(payload).copyHeaders(headers.toMap()).build();
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Forwarding message " + byteMessage); logger.trace("Forwarding message " + message);
} }
byte[] bytesToWrite = stompMessageConverter.fromMessage(byteMessage); byte[] bytes = stompMessageConverter.fromMessage(message);
connection.send(new String(bytesToWrite, Charset.forName("UTF-8"))); connection.send(new String(bytes, Charset.forName("UTF-8")));
} }
catch (Throwable ex) { catch (Throwable ex) {
logger.error("Failed to forward message " + message, ex); logger.error("Failed to forward message " + message, ex);

View File

@ -17,21 +17,17 @@ package org.springframework.web.messaging.stomp.support;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.springframework.http.MediaType;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.messaging.MessageType; import org.springframework.web.messaging.MessageType;
import org.springframework.web.messaging.converter.CompositeMessageConverter;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.stomp.StompCommand; import org.springframework.web.messaging.stomp.StompCommand;
import org.springframework.web.messaging.stomp.StompConversionException; import org.springframework.web.messaging.stomp.StompConversionException;
import org.springframework.web.messaging.support.WebMessageHeaderAccesssor; import org.springframework.web.messaging.support.WebMessageHeaderAccesssor;
@ -59,8 +55,6 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<String, WebSocketSession>(); private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<String, WebSocketSession>();
private MessageConverter payloadConverter = new CompositeMessageConverter(null);
/** /**
* @param outputChannel the channel to which incoming STOMP/WebSocket messages should * @param outputChannel the channel to which incoming STOMP/WebSocket messages should
@ -72,15 +66,10 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
} }
public void setMessageConverters(List<MessageConverter> converters) {
this.payloadConverter = new CompositeMessageConverter(converters);
}
public StompMessageConverter getStompMessageConverter() { public StompMessageConverter getStompMessageConverter() {
return this.stompMessageConverter; return this.stompMessageConverter;
} }
@Override @Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception { public void afterConnectionEstablished(WebSocketSession session) throws Exception {
Assert.notNull(this.outputChannel, "No output channel for STOMP messages."); Assert.notNull(this.outputChannel, "No output channel for STOMP messages.");
@ -161,7 +150,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
Message<?> connectedMessage = MessageBuilder.withPayload(EMPTY_PAYLOAD).copyHeaders( Message<?> connectedMessage = MessageBuilder.withPayload(EMPTY_PAYLOAD).copyHeaders(
connectedHeaders.toMap()).build(); connectedHeaders.toMap()).build();
byte[] bytes = getStompMessageConverter().fromMessage(connectedMessage); byte[] bytes = this.stompMessageConverter.fromMessage(connectedMessage);
session.sendMessage(new TextMessage(new String(bytes, Charset.forName("UTF-8")))); session.sendMessage(new TextMessage(new String(bytes, Charset.forName("UTF-8"))));
} }
@ -221,36 +210,32 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
String sessionId = headers.getSessionId(); String sessionId = headers.getSessionId();
if (sessionId == null) { if (sessionId == null) {
// TODO: failed message delivery mechanism // TODO: failed message delivery mechanism
logger.error("No \"sessionId\" header in message: " + message); logger.error("Ignoring message, no sessionId header: " + message);
return; return;
} }
WebSocketSession session = this.sessions.get(sessionId); WebSocketSession session = this.sessions.get(sessionId);
if (session == null) { if (session == null) {
// TODO: failed message delivery mechanism // TODO: failed message delivery mechanism
logger.error("WebSocketSession not found for sessionId=" + sessionId); logger.error("Ignoring message, session not found: " + sessionId);
return; return;
} }
if (headers.getSubscriptionId() == null) { if (headers.getSubscriptionId() == null) {
// TODO: failed message delivery mechanism // TODO: failed message delivery mechanism
logger.error("No subscription id: " + message); logger.error("Ignoring message, no subscriptionId header: " + message);
return; return;
} }
byte[] payload; if (!(message.getPayload() instanceof byte[])) {
try { // TODO: failed message delivery mechanism
MediaType contentType = headers.getContentType(); logger.error("Ignoring message, expected byte[] content: " + message);
payload = payloadConverter.convertToPayload(message.getPayload(), contentType);
}
catch (Throwable t) {
logger.error("Failed to send " + message, t);
return; return;
} }
try { try {
Message<?> byteMessage = MessageBuilder.withPayload(payload).copyHeaders(headers.toMap()).build(); message = MessageBuilder.fromMessage(message).copyHeaders(headers.toMap()).build();
byte[] bytes = getStompMessageConverter().fromMessage(byteMessage); byte[] bytes = this.stompMessageConverter.fromMessage(message);
session.sendMessage(new TextMessage(new String(bytes, Charset.forName("UTF-8")))); session.sendMessage(new TextMessage(new String(bytes, Charset.forName("UTF-8"))));
} }
catch (Throwable t) { catch (Throwable t) {