Remove generic params from Message/MessageChannel

This commit is contained in:
Rossen Stoyanchev 2013-06-24 11:14:48 -04:00
parent f7f66f2e5c
commit 32cb2ca2e7
26 changed files with 461 additions and 356 deletions

View File

@ -23,8 +23,7 @@ package org.springframework.messaging;
* @author Mark Fisher
* @since 4.0
*/
@SuppressWarnings("rawtypes")
public interface MessageChannel<M extends Message> {
public interface MessageChannel {
/**
* Send a {@link Message} to this channel. May throw a RuntimeException for
@ -39,7 +38,7 @@ public interface MessageChannel<M extends Message> {
*
* @return whether or not the Message has been sent successfully
*/
boolean send(M message);
boolean send(Message<?> message);
/**
* Send a message, blocking until either the message is accepted or the
@ -52,6 +51,6 @@ public interface MessageChannel<M extends Message> {
* <code>false</code> if the specified timeout period elapses or
* the send is interrupted
*/
boolean send(M message, long timeout);
boolean send(Message<?> message, long timeout);
}

View File

@ -1,40 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging;
import java.util.Map;
/**
* A factory for creating messages, allowing for control of the concrete type of the message.
*
* @author Andy Wilkinson
* @since 4.0
*/
public interface MessageFactory<M extends Message<?>> {
/**
* Creates a new message with the given payload and headers
*
* @param payload The message payload
* @param headers The message headers
* @param <P> The payload's type
*
* @return the message
*/
<P> M createMessage(P payload, Map<String, Object> headers);
}

View File

@ -24,11 +24,10 @@ package org.springframework.messaging;
* @author Iwein Fuld
* @since 4.0
*/
@SuppressWarnings("rawtypes")
public interface MessageHandler<M extends Message> {
public interface MessageHandler {
/**
* TODO: support exceptions?
* TODO: exceptions
*
* Handles the message if possible. If the handler cannot deal with the
* message this will result in a <code>MessageRejectedException</code> e.g.
@ -47,6 +46,6 @@ public interface MessageHandler<M extends Message> {
* @throws org.springframework.integration.MessageDeliveryException when this handler failed to deliver the
* reply related to the handling of the message
*/
void handleMessage(M message) throws MessagingException;
void handleMessage(Message<?> message) throws MessagingException;
}

View File

@ -38,9 +38,6 @@ import org.apache.commons.logging.LogFactory;
* IMPORTANT: MessageHeaders are immutable. Any mutating operation (e.g., put(..), putAll(..) etc.)
* will result in {@link UnsupportedOperationException}
*
* <p>
* TODO: update below instructions
*
* <p>To create MessageHeaders instance use fluent MessageBuilder API
* <pre>
* MessageBuilder.withPayload("foo").setHeader("key1", "value1").setHeader("key2", "value2");
@ -61,7 +58,7 @@ import org.apache.commons.logging.LogFactory;
*/
public class MessageHeaders implements Map<String, Object>, Serializable {
private static final long serialVersionUID = 8946067357652612145L;
private static final long serialVersionUID = -4615750558355702881L;
private static final Log logger = LogFactory.getLog(MessageHeaders.class);
@ -77,6 +74,27 @@ public class MessageHeaders implements Map<String, Object>, Serializable {
public static final String TIMESTAMP = "timestamp";
public static final String CORRELATION_ID = "correlationId";
public static final String REPLY_CHANNEL = "replyChannel";
public static final String ERROR_CHANNEL = "errorChannel";
public static final String EXPIRATION_DATE = "expirationDate";
public static final String PRIORITY = "priority";
public static final String SEQUENCE_NUMBER = "sequenceNumber";
public static final String SEQUENCE_SIZE = "sequenceSize";
public static final String SEQUENCE_DETAILS = "sequenceDetails";
public static final String CONTENT_TYPE = "contentType";
public static final String POSTPROCESS_RESULT = "postProcessResult";
public static final List<String> HEADER_NAMES = Arrays.asList(ID, TIMESTAMP);
@ -103,6 +121,36 @@ public class MessageHeaders implements Map<String, Object>, Serializable {
return this.get(TIMESTAMP, Long.class);
}
public Long getExpirationDate() {
return this.get(EXPIRATION_DATE, Long.class);
}
public Object getCorrelationId() {
return this.get(CORRELATION_ID);
}
public Integer getSequenceNumber() {
Integer sequenceNumber = this.get(SEQUENCE_NUMBER, Integer.class);
return (sequenceNumber != null ? sequenceNumber : 0);
}
public Integer getSequenceSize() {
Integer sequenceSize = this.get(SEQUENCE_SIZE, Integer.class);
return (sequenceSize != null ? sequenceSize : 0);
}
public Integer getPriority() {
return this.get(PRIORITY, Integer.class);
}
public Object getReplyChannel() {
return this.get(REPLY_CHANNEL);
}
public Object getErrorChannel() {
return this.get(ERROR_CHANNEL);
}
@SuppressWarnings("unchecked")
public <T> T get(Object key, Class<T> type) {
Object value = this.headers.get(key);

View File

@ -0,0 +1,46 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging;
/**
* Interface for Message Channels from which Messages may be actively received through polling.
*
* @author Mark Fisher
* @since 4.0
*/
public interface PollableChannel extends MessageChannel {
/**
* Receive a message from this channel, blocking indefinitely if necessary.
*
* @return the next available {@link Message} or <code>null</code> if interrupted
*/
Message<?> receive();
/**
* Receive a message from this channel, blocking until either a message is
* available or the specified timeout period elapses.
*
* @param timeout the timeout in milliseconds
*
* @return the next available {@link Message} or <code>null</code> if the
* specified timeout period elapses or the message reception is interrupted
*/
Message<?> receive(long timeout);
}

View File

@ -25,18 +25,16 @@ package org.springframework.messaging;
* @author Mark Fisher
* @since 4.0
*/
@SuppressWarnings("rawtypes")
public interface SubscribableChannel<M extends Message, H extends MessageHandler<M>>
extends MessageChannel<M> {
public interface SubscribableChannel extends MessageChannel {
/**
* Register a {@link MessageHandler} as a subscriber to this channel.
*/
boolean subscribe(H handler);
boolean subscribe(MessageHandler handler);
/**
* Remove a {@link MessageHandler} from the subscribers of this channel.
*/
boolean unsubscribe(H handler);
boolean unsubscribe(MessageHandler handler);
}

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -18,21 +18,24 @@ package org.springframework.messaging.support;
import java.util.Map;
import org.springframework.messaging.MessageFactory;
/**
* A {@link MessageFactory} that creates {@link GenericMessage GenericMessages}.
* A message implementation that accepts a {@link Throwable} payload.
* Once created this object is immutable.
*
* @author Andy Wilkinson
* @author Mark Fisher
* @author Oleg Zhurakousky
* @since 4.0
*/
public class GenericMessageFactory implements MessageFactory<GenericMessage<?>> {
public class ErrorMessage extends GenericMessage<Throwable> {
private static final long serialVersionUID = -5470210965279837728L;
@Override
public <P> GenericMessage<P> createMessage(P payload, Map<String, Object> headers) {
return new GenericMessage<P>(payload, headers);
public ErrorMessage(Throwable payload) {
super(payload);
}
public ErrorMessage(Throwable payload, Map<String, Object> headers) {
super(payload, headers);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2010 the original author or authors.
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,19 +17,25 @@
package org.springframework.messaging.support;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.PatternMatchUtils;
import org.springframework.util.StringUtils;
/**
* TODO
*
* @author Arjen Poutsma
* @author Mark Fisher
* @author Oleg Zhurakousky
@ -44,46 +50,24 @@ public final class MessageBuilder<T> {
private final Message<T> originalMessage;
@SuppressWarnings("rawtypes")
private static volatile MessageFactory messageFactory = null;
private volatile boolean modified;
/**
* A constructor with payload and an optional message to copy headers from.
* This is a private constructor to be invoked from the static factory methods only.
*
* @param payload the message payload, never {@code null}
* @param originalMessage a message to copy from or re-use if no changes are made, can
* be {@code null}
* Private constructor to be invoked from the static factory methods only.
*/
private MessageBuilder(T payload, Message<T> originalMessage) {
Assert.notNull(payload, "payload is required");
Assert.notNull(payload, "payload must not be null");
this.payload = payload;
this.originalMessage = originalMessage;
if (originalMessage != null) {
this.headers.putAll(originalMessage.getHeaders());
this.copyHeaders(originalMessage.getHeaders());
this.modified = (!this.payload.equals(originalMessage.getPayload()));
}
}
/**
* Private constructor to be invoked from the static factory methods only.
*
* @param payload the message payload, never {@code null}
* @param originalMessage a message to copy from or re-use if no changes are made, can
* be {@code null}
*/
private MessageBuilder(T payload, Map<String, Object> headers) {
Assert.notNull(payload, "payload is required");
Assert.notNull(headers, "headers is required");
this.payload = payload;
this.headers.putAll(headers);
this.originalMessage = null;
}
/**
* Create a builder for a new {@link Message} instance pre-populated with all of the
* headers copied from the provided message. The payload of the provided Message will
* also be used as the payload for the new message.
* Create a builder for a new {@link Message} instance pre-populated with all of the headers copied from the
* provided message. The payload of the provided Message will also be used as the payload for the new message.
*
* @param message the Message from which the payload and all headers will be copied
*/
@ -99,70 +83,69 @@ public final class MessageBuilder<T> {
* @param payload the payload for the new message
*/
public static <T> MessageBuilder<T> withPayload(T payload) {
MessageBuilder<T> builder = new MessageBuilder<T>(payload, (Message<T>) null);
MessageBuilder<T> builder = new MessageBuilder<T>(payload, null);
return builder;
}
/**
* Set the value for the given header name. If the provided value is <code>null</code>
* the header will be removed.
* Set the value for the given header name. If the provided value is <code>null</code>, the header will be removed.
*/
public MessageBuilder<T> setHeader(String headerName, Object headerValue) {
Assert.isTrue(!this.isReadOnly(headerName), "The '" + headerName + "' header is read-only.");
if (StringUtils.hasLength(headerName)) {
putOrRemove(headerName, headerValue);
if (StringUtils.hasLength(headerName) && !headerName.equals(MessageHeaders.ID)
&& !headerName.equals(MessageHeaders.TIMESTAMP)) {
this.verifyType(headerName, headerValue);
if (headerValue == null) {
Object removedValue = this.headers.remove(headerName);
if (removedValue != null) {
this.modified = true;
}
}
else {
Object replacedValue = this.headers.put(headerName, headerValue);
if (!headerValue.equals(replacedValue)) {
this.modified = true;
}
}
}
return this;
}
private boolean isReadOnly(String headerName) {
return MessageHeaders.ID.equals(headerName) || MessageHeaders.TIMESTAMP.equals(headerName);
}
private void putOrRemove(String headerName, Object headerValue) {
if (headerValue == null) {
this.headers.remove(headerName);
}
else {
this.headers.put(headerName, headerValue);
}
}
/**
* Set the value for the given header name only if the header name is not already
* associated with a value.
* Set the value for the given header name only if the header name is not already associated with a value.
*/
public MessageBuilder<T> setHeaderIfAbsent(String headerName, Object headerValue) {
if (this.headers.get(headerName) == null) {
putOrRemove(headerName, headerValue);
this.setHeader(headerName, headerValue);
}
return this;
}
/**
* Removes all headers provided via array of 'headerPatterns'. As the name suggests
* the array may contain simple matching patterns for header names. Supported pattern
* styles are: "xxx*", "*xxx", "*xxx*" and "xxx*yyy".
* Removes all headers provided via array of 'headerPatterns'. As the name suggests the array
* may contain simple matching patterns for header names. Supported pattern styles are:
* "xxx*", "*xxx", "*xxx*" and "xxx*yyy".
*
* @param headerPatterns
*/
public MessageBuilder<T> removeHeaders(String... headerPatterns) {
List<String> toRemove = new ArrayList<String>();
List<String> headersToRemove = new ArrayList<String>();
for (String pattern : headerPatterns) {
if (StringUtils.hasLength(pattern)){
if (pattern.contains("*")){
for (String headerName : this.headers.keySet()) {
if (PatternMatchUtils.simpleMatch(pattern, headerName)){
toRemove.add(headerName);
headersToRemove.add(headerName);
}
}
}
else {
toRemove.add(pattern);
headersToRemove.add(pattern);
}
}
}
for (String headerName : toRemove) {
this.headers.remove(headerName);
putOrRemove(headerName, null);
for (String headerToRemove : headersToRemove) {
this.removeHeader(headerToRemove);
}
return this;
}
@ -170,63 +153,182 @@ public final class MessageBuilder<T> {
* Remove the value for the given header name.
*/
public MessageBuilder<T> removeHeader(String headerName) {
if (StringUtils.hasLength(headerName) && !isReadOnly(headerName)) {
this.headers.remove(headerName);
if (StringUtils.hasLength(headerName) && !headerName.equals(MessageHeaders.ID)
&& !headerName.equals(MessageHeaders.TIMESTAMP)) {
Object removedValue = this.headers.remove(headerName);
if (removedValue != null) {
this.modified = true;
}
}
return this;
}
/**
* Copy the name-value pairs from the provided Map. This operation will overwrite any
* existing values. Use { {@link #copyHeadersIfAbsent(Map)} to avoid overwriting
* values. Note that the 'id' and 'timestamp' header values will never be overwritten.
* Copy the name-value pairs from the provided Map. This operation will overwrite any existing values. Use {
* {@link #copyHeadersIfAbsent(Map)} to avoid overwriting values. Note that the 'id' and 'timestamp' header values
* will never be overwritten.
*
* @see MessageHeaders#ID
* @see MessageHeaders#TIMESTAMP
*/
public MessageBuilder<T> copyHeaders(Map<String, ?> headersToCopy) {
Set<String> keys = headersToCopy.keySet();
for (String key : keys) {
if (!this.isReadOnly(key)) {
putOrRemove(key, headersToCopy.get(key));
this.setHeader(key, headersToCopy.get(key));
}
}
return this;
}
/**
* Copy the name-value pairs from the provided Map. This operation will <em>not</em>
* overwrite any existing values.
* Copy the name-value pairs from the provided Map. This operation will <em>not</em> overwrite any existing values.
*/
public MessageBuilder<T> copyHeadersIfAbsent(Map<String, ?> headersToCopy) {
Set<String> keys = headersToCopy.keySet();
for (String key : keys) {
if (!this.isReadOnly(key) && (this.headers.get(key) == null)) {
putOrRemove(key, headersToCopy.get(key));
if (!this.isReadOnly(key)) {
this.setHeaderIfAbsent(key, headersToCopy.get(key));
}
}
return this;
}
@SuppressWarnings("unchecked")
public Message<T> build() {
public MessageBuilder<T> setExpirationDate(Long expirationDate) {
return this.setHeader(MessageHeaders.EXPIRATION_DATE, expirationDate);
}
if (this.originalMessage != null
&& this.headers.equals(this.originalMessage.getHeaders())
&& this.payload.equals(this.originalMessage.getPayload())) {
return this.originalMessage;
}
// if (this.payload instanceof Throwable) {
// return (Message<T>) new ErrorMessage((Throwable) this.payload, this.headers);
// }
this.headers.remove(MessageHeaders.ID);
this.headers.remove(MessageHeaders.TIMESTAMP);
if (messageFactory == null) {
return new GenericMessage<T>(this.payload, this.headers);
public MessageBuilder<T> setExpirationDate(Date expirationDate) {
if (expirationDate != null) {
return this.setHeader(MessageHeaders.EXPIRATION_DATE, expirationDate.getTime());
}
else {
return messageFactory.createMessage(payload, headers);
return this.setHeader(MessageHeaders.EXPIRATION_DATE, null);
}
}
public MessageBuilder<T> setCorrelationId(Object correlationId) {
return this.setHeader(MessageHeaders.CORRELATION_ID, correlationId);
}
public MessageBuilder<T> pushSequenceDetails(Object correlationId, int sequenceNumber, int sequenceSize) {
Object incomingCorrelationId = headers.get(MessageHeaders.CORRELATION_ID);
@SuppressWarnings("unchecked")
List<List<Object>> incomingSequenceDetails = (List<List<Object>>) headers.get(MessageHeaders.SEQUENCE_DETAILS);
if (incomingCorrelationId != null) {
if (incomingSequenceDetails == null) {
incomingSequenceDetails = new ArrayList<List<Object>>();
}
else {
incomingSequenceDetails = new ArrayList<List<Object>>(incomingSequenceDetails);
}
incomingSequenceDetails.add(Arrays.asList(incomingCorrelationId,
headers.get(MessageHeaders.SEQUENCE_NUMBER), headers.get(MessageHeaders.SEQUENCE_SIZE)));
incomingSequenceDetails = Collections.unmodifiableList(incomingSequenceDetails);
}
if (incomingSequenceDetails != null) {
setHeader(MessageHeaders.SEQUENCE_DETAILS, incomingSequenceDetails);
}
return setCorrelationId(correlationId).setSequenceNumber(sequenceNumber).setSequenceSize(sequenceSize);
}
public MessageBuilder<T> popSequenceDetails() {
String key = MessageHeaders.SEQUENCE_DETAILS;
if (!headers.containsKey(key)) {
return this;
}
@SuppressWarnings("unchecked")
List<List<Object>> incomingSequenceDetails = new ArrayList<List<Object>>((List<List<Object>>) headers.get(key));
List<Object> sequenceDetails = incomingSequenceDetails.remove(incomingSequenceDetails.size() - 1);
Assert.state(sequenceDetails.size() == 3, "Wrong sequence details (not created by MessageBuilder?): "
+ sequenceDetails);
setCorrelationId(sequenceDetails.get(0));
Integer sequenceNumber = (Integer) sequenceDetails.get(1);
Integer sequenceSize = (Integer) sequenceDetails.get(2);
if (sequenceNumber != null) {
setSequenceNumber(sequenceNumber);
}
if (sequenceSize != null) {
setSequenceSize(sequenceSize);
}
if (!incomingSequenceDetails.isEmpty()) {
headers.put(MessageHeaders.SEQUENCE_DETAILS, incomingSequenceDetails);
}
else {
headers.remove(MessageHeaders.SEQUENCE_DETAILS);
}
return this;
}
public MessageBuilder<T> setReplyChannel(MessageChannel replyChannel) {
return this.setHeader(MessageHeaders.REPLY_CHANNEL, replyChannel);
}
public MessageBuilder<T> setReplyChannelName(String replyChannelName) {
return this.setHeader(MessageHeaders.REPLY_CHANNEL, replyChannelName);
}
public MessageBuilder<T> setErrorChannel(MessageChannel errorChannel) {
return this.setHeader(MessageHeaders.ERROR_CHANNEL, errorChannel);
}
public MessageBuilder<T> setErrorChannelName(String errorChannelName) {
return this.setHeader(MessageHeaders.ERROR_CHANNEL, errorChannelName);
}
public MessageBuilder<T> setSequenceNumber(Integer sequenceNumber) {
return this.setHeader(MessageHeaders.SEQUENCE_NUMBER, sequenceNumber);
}
public MessageBuilder<T> setSequenceSize(Integer sequenceSize) {
return this.setHeader(MessageHeaders.SEQUENCE_SIZE, sequenceSize);
}
public MessageBuilder<T> setPriority(Integer priority) {
return this.setHeader(MessageHeaders.PRIORITY, priority);
}
@SuppressWarnings("unchecked")
public Message<T> build() {
if (!this.modified && this.originalMessage != null) {
return this.originalMessage;
}
if (this.payload instanceof Throwable) {
return (Message<T>) new ErrorMessage((Throwable) this.payload, this.headers);
}
return new GenericMessage<T>(this.payload, this.headers);
}
private boolean isReadOnly(String headerName) {
return MessageHeaders.ID.equals(headerName) || MessageHeaders.TIMESTAMP.equals(headerName);
}
private void verifyType(String headerName, Object headerValue) {
if (headerName != null && headerValue != null) {
if (MessageHeaders.ID.equals(headerName)) {
Assert.isTrue(headerValue instanceof UUID, "The '" + headerName + "' header value must be a UUID.");
}
else if (MessageHeaders.TIMESTAMP.equals(headerName)) {
Assert.isTrue(headerValue instanceof Long, "The '" + headerName + "' header value must be a Long.");
}
else if (MessageHeaders.EXPIRATION_DATE.equals(headerName)) {
Assert.isTrue(headerValue instanceof Date || headerValue instanceof Long, "The '" + headerName
+ "' header value must be a Date or Long.");
}
else if (MessageHeaders.ERROR_CHANNEL.equals(headerName)
|| MessageHeaders.REPLY_CHANNEL.endsWith(headerName)) {
Assert.isTrue(headerValue instanceof MessageChannel || headerValue instanceof String, "The '"
+ headerName + "' header value must be a MessageChannel or String.");
}
else if (MessageHeaders.SEQUENCE_NUMBER.equals(headerName)
|| MessageHeaders.SEQUENCE_SIZE.equals(headerName)) {
Assert.isTrue(Integer.class.isAssignableFrom(headerValue.getClass()), "The '" + headerName
+ "' header value must be an Integer.");
}
else if (MessageHeaders.PRIORITY.equals(headerName)) {
Assert.isTrue(Integer.class.isAssignableFrom(headerValue.getClass()), "The '" + headerName
+ "' header value must be an Integer.");
}
}
}

View File

@ -16,8 +16,6 @@
package org.springframework.web.messaging;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
@ -25,24 +23,22 @@ import org.springframework.messaging.SubscribableChannel;
* @author Rossen Stoyanchev
* @since 4.0
*/
@SuppressWarnings("rawtypes")
public interface PubSubChannelRegistry<M extends Message, H extends MessageHandler<M>> {
public interface PubSubChannelRegistry {
/**
* A channel for messaging arriving from clients.
*/
SubscribableChannel<M, H> getClientInputChannel();
SubscribableChannel getClientInputChannel();
/**
* A channel for sending direct messages to a client. The client must be have
* previously subscribed to the destination of the message.
*/
SubscribableChannel<M, H> getClientOutputChannel();
SubscribableChannel getClientOutputChannel();
/**
* A channel for broadcasting messages through a message broker.
*/
SubscribableChannel<M, H> getMessageBrokerChannel();
SubscribableChannel getMessageBrokerChannel();
}

View File

@ -37,8 +37,7 @@ import org.springframework.web.messaging.support.PubSubHeaderAccesssor;
* @author Rossen Stoyanchev
* @since 4.0
*/
@SuppressWarnings("rawtypes")
public abstract class AbstractPubSubMessageHandler<M extends Message> implements MessageHandler<M> {
public abstract class AbstractPubSubMessageHandler implements MessageHandler {
protected final Log logger = LogFactory.getLog(getClass());
@ -68,7 +67,7 @@ public abstract class AbstractPubSubMessageHandler<M extends Message> implements
protected abstract Collection<MessageType> getSupportedMessageTypes();
protected boolean canHandle(M message, MessageType messageType) {
protected boolean canHandle(Message<?> message, MessageType messageType) {
if (!CollectionUtils.isEmpty(getSupportedMessageTypes())) {
if (!getSupportedMessageTypes().contains(messageType)) {
@ -79,7 +78,7 @@ public abstract class AbstractPubSubMessageHandler<M extends Message> implements
return isDestinationAllowed(message);
}
protected boolean isDestinationAllowed(M message) {
protected boolean isDestinationAllowed(Message<?> message) {
PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.wrap(message);
String destination = headers.getDestination();
@ -115,7 +114,7 @@ public abstract class AbstractPubSubMessageHandler<M extends Message> implements
}
@Override
public final void handleMessage(M message) throws MessagingException {
public final void handleMessage(Message<?> message) throws MessagingException {
PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.wrap(message);
MessageType messageType = headers.getMessageType();
@ -144,22 +143,22 @@ public abstract class AbstractPubSubMessageHandler<M extends Message> implements
}
}
protected void handleConnect(M message) {
protected void handleConnect(Message<?> message) {
}
protected void handlePublish(M message) {
protected void handlePublish(Message<?> message) {
}
protected void handleSubscribe(M message) {
protected void handleSubscribe(Message<?> message) {
}
protected void handleUnsubscribe(M message) {
protected void handleUnsubscribe(Message<?> message) {
}
protected void handleDisconnect(M message) {
protected void handleDisconnect(Message<?> message) {
}
protected void handleOther(M message) {
protected void handleOther(Message<?> message) {
}
}

View File

@ -44,10 +44,9 @@ import reactor.fn.selector.ObjectSelector;
* @author Rossen Stoyanchev
* @since 4.0
*/
@SuppressWarnings("rawtypes")
public class ReactorPubSubMessageHandler<M extends Message> extends AbstractPubSubMessageHandler<M> {
public class ReactorPubSubMessageHandler extends AbstractPubSubMessageHandler {
private MessageChannel<M> clientChannel;
private MessageChannel clientChannel;
private final Reactor reactor;
@ -56,7 +55,7 @@ public class ReactorPubSubMessageHandler<M extends Message> extends AbstractPubS
private Map<String, List<Registration<?>>> subscriptionsBySession = new ConcurrentHashMap<String, List<Registration<?>>>();
public ReactorPubSubMessageHandler(PubSubChannelRegistry<M, ?> registry, Reactor reactor) {
public ReactorPubSubMessageHandler(PubSubChannelRegistry registry, Reactor reactor) {
Assert.notNull(reactor, "reactor is required");
this.clientChannel = registry.getClientOutputChannel();
this.reactor = reactor;
@ -73,7 +72,7 @@ public class ReactorPubSubMessageHandler<M extends Message> extends AbstractPubS
}
@Override
public void handleSubscribe(M message) {
public void handleSubscribe(Message<?> message) {
if (logger.isDebugEnabled()) {
logger.debug("Subscribe " + message);
@ -100,7 +99,7 @@ public class ReactorPubSubMessageHandler<M extends Message> extends AbstractPubS
}
@Override
public void handlePublish(M message) {
public void handlePublish(Message<?> message) {
if (logger.isDebugEnabled()) {
logger.debug("Message received: " + message);
@ -110,8 +109,7 @@ public class ReactorPubSubMessageHandler<M extends Message> extends AbstractPubS
// Convert to byte[] payload before the fan-out
PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.wrap(message);
byte[] payload = payloadConverter.convertToPayload(message.getPayload(), headers.getContentType());
@SuppressWarnings("unchecked")
M m = (M) MessageBuilder.withPayload(payload).copyHeaders(message.getHeaders()).build();
Message<?> m = MessageBuilder.withPayload(payload).copyHeaders(message.getHeaders()).build();
this.reactor.notify(getPublishKey(headers.getDestination()), Event.wrap(m));
}
@ -121,7 +119,7 @@ public class ReactorPubSubMessageHandler<M extends Message> extends AbstractPubS
}
@Override
public void handleDisconnect(M message) {
public void handleDisconnect(Message<?> message) {
PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.wrap(message);
removeSubscriptions(headers.getSessionId());
}
@ -154,8 +152,8 @@ public class ReactorPubSubMessageHandler<M extends Message> extends AbstractPubS
PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.wrap(sentMessage);
headers.setSubscriptionId(this.subscriptionId);
@SuppressWarnings("unchecked")
M clientMessage = (M) MessageBuilder.withPayload(sentMessage.getPayload()).copyHeaders(headers.toHeaders()).build();
Message<?> clientMessage = MessageBuilder.withPayload(
sentMessage.getPayload()).copyHeaders(headers.toHeaders()).build();
clientChannel.send(clientMessage);
}

View File

@ -54,11 +54,10 @@ import org.springframework.web.method.HandlerMethodSelector;
* @author Rossen Stoyanchev
* @since 4.0
*/
@SuppressWarnings("rawtypes")
public class AnnotationPubSubMessageHandler<M extends Message> extends AbstractPubSubMessageHandler<M>
public class AnnotationPubSubMessageHandler extends AbstractPubSubMessageHandler
implements ApplicationContextAware, InitializingBean {
private PubSubChannelRegistry<M, ?> registry;
private PubSubChannelRegistry registry;
private List<MessageConverter> messageConverters;
@ -73,12 +72,12 @@ public class AnnotationPubSubMessageHandler<M extends Message> extends AbstractP
private final Map<Class<?>, MessageExceptionHandlerMethodResolver> exceptionHandlerCache =
new ConcurrentHashMap<Class<?>, MessageExceptionHandlerMethodResolver>(64);
private ArgumentResolverComposite<M> argumentResolvers = new ArgumentResolverComposite<M>();
private ArgumentResolverComposite argumentResolvers = new ArgumentResolverComposite();
private ReturnValueHandlerComposite<M> returnValueHandlers = new ReturnValueHandlerComposite<M>();
private ReturnValueHandlerComposite returnValueHandlers = new ReturnValueHandlerComposite();
public AnnotationPubSubMessageHandler(PubSubChannelRegistry<M, ?> registry) {
public AnnotationPubSubMessageHandler(PubSubChannelRegistry registry) {
Assert.notNull(registry, "registry is required");
this.registry = registry;
}
@ -102,10 +101,10 @@ public class AnnotationPubSubMessageHandler<M extends Message> extends AbstractP
initHandlerMethods();
this.argumentResolvers.addResolver(new MessageChannelArgumentResolver<M>(this.registry.getMessageBrokerChannel()));
this.argumentResolvers.addResolver(new MessageBodyArgumentResolver<M>(this.messageConverters));
this.argumentResolvers.addResolver(new MessageChannelArgumentResolver(this.registry.getMessageBrokerChannel()));
this.argumentResolvers.addResolver(new MessageBodyArgumentResolver(this.messageConverters));
this.returnValueHandlers.addHandler(new MessageReturnValueHandler<M>(this.registry.getClientOutputChannel()));
this.returnValueHandlers.addHandler(new MessageReturnValueHandler(this.registry.getClientOutputChannel()));
}
protected void initHandlerMethods() {
@ -139,9 +138,8 @@ public class AnnotationPubSubMessageHandler<M extends Message> extends AbstractP
new UnsubscribeMappingInfoCreator(), this.unsubscribeMethods);
}
@SuppressWarnings("unchecked")
private <A extends Annotation> void initHandlerMethods(Object handler, Class<?> handlerType,
final Class<A> annotationType, MappingInfoCreator mappingInfoCreator,
final Class<A> annotationType, MappingInfoCreator<A> mappingInfoCreator,
Map<MappingInfo, HandlerMethod> handlerMethods) {
Set<Method> messageMethods = HandlerMethodSelector.selectMethods(handlerType, new MethodFilter() {
@ -171,21 +169,21 @@ public class AnnotationPubSubMessageHandler<M extends Message> extends AbstractP
}
@Override
public void handlePublish(M message) {
public void handlePublish(Message<?> message) {
handleMessageInternal(message, this.messageMethods);
}
@Override
public void handleSubscribe(M message) {
public void handleSubscribe(Message<?> message) {
handleMessageInternal(message, this.subscribeMethods);
}
@Override
public void handleUnsubscribe(M message) {
public void handleUnsubscribe(Message<?> message) {
handleMessageInternal(message, this.unsubscribeMethods);
}
private void handleMessageInternal(final M message, Map<MappingInfo, HandlerMethod> handlerMethods) {
private void handleMessageInternal(final Message<?> message, Map<MappingInfo, HandlerMethod> handlerMethods) {
PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.wrap(message);
String destination = headers.getDestination();
@ -198,7 +196,7 @@ public class AnnotationPubSubMessageHandler<M extends Message> extends AbstractP
HandlerMethod handlerMethod = match.createWithResolvedBean();
// TODO: avoid re-creating invocableHandlerMethod
InvocableMessageHandlerMethod<M> invocableHandlerMethod = new InvocableMessageHandlerMethod<M>(handlerMethod);
InvocableMessageHandlerMethod invocableHandlerMethod = new InvocableMessageHandlerMethod(handlerMethod);
invocableHandlerMethod.setMessageMethodArgumentResolvers(this.argumentResolvers);
try {
@ -225,9 +223,9 @@ public class AnnotationPubSubMessageHandler<M extends Message> extends AbstractP
}
}
private void invokeExceptionHandler(M message, HandlerMethod handlerMethod, Exception ex) {
private void invokeExceptionHandler(Message<?> message, HandlerMethod handlerMethod, Exception ex) {
InvocableMessageHandlerMethod<M> invocableHandlerMethod;
InvocableMessageHandlerMethod invocableHandlerMethod;
Class<?> beanType = handlerMethod.getBeanType();
MessageExceptionHandlerMethodResolver resolver = this.exceptionHandlerCache.get(beanType);
if (resolver == null) {
@ -241,7 +239,7 @@ public class AnnotationPubSubMessageHandler<M extends Message> extends AbstractP
return;
}
invocableHandlerMethod = new InvocableMessageHandlerMethod<M>(handlerMethod.getBean(), method);
invocableHandlerMethod = new InvocableMessageHandlerMethod(handlerMethod.getBean(), method);
invocableHandlerMethod.setMessageMethodArgumentResolvers(this.argumentResolvers);
try {

View File

@ -27,8 +27,7 @@ import org.springframework.messaging.Message;
* @author Rossen Stoyanchev
* @since 4.0
*/
@SuppressWarnings("rawtypes")
public interface ArgumentResolver<M extends Message> {
public interface ArgumentResolver {
/**
* Whether the given {@linkplain MethodParameter method parameter} is
@ -54,6 +53,6 @@ public interface ArgumentResolver<M extends Message> {
*
* @throws Exception in case of errors with the preparation of argument values
*/
Object resolveArgument(MethodParameter parameter, M message) throws Exception;
Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception;
}

View File

@ -36,21 +36,20 @@ import org.springframework.util.Assert;
* @author Rossen Stoyanchev
* @since 4.0
*/
@SuppressWarnings("rawtypes")
public class ArgumentResolverComposite<M extends Message> implements ArgumentResolver<M> {
public class ArgumentResolverComposite implements ArgumentResolver {
protected final Log logger = LogFactory.getLog(getClass());
private final List<ArgumentResolver<M>> argumentResolvers = new LinkedList<ArgumentResolver<M>>();
private final List<ArgumentResolver> argumentResolvers = new LinkedList<ArgumentResolver>();
private final Map<MethodParameter, ArgumentResolver<M>> argumentResolverCache =
new ConcurrentHashMap<MethodParameter, ArgumentResolver<M>>(256);
private final Map<MethodParameter, ArgumentResolver> argumentResolverCache =
new ConcurrentHashMap<MethodParameter, ArgumentResolver>(256);
/**
* Return a read-only list with the contained resolvers, or an empty list.
*/
public List<ArgumentResolver<M>> getResolvers() {
public List<ArgumentResolver> getResolvers() {
return Collections.unmodifiableList(this.argumentResolvers);
}
@ -68,9 +67,9 @@ public class ArgumentResolverComposite<M extends Message> implements ArgumentRes
* @exception IllegalStateException if no suitable {@link ArgumentResolver} is found.
*/
@Override
public Object resolveArgument(MethodParameter parameter, M message) throws Exception {
public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception {
ArgumentResolver<M> resolver = getArgumentResolver(parameter);
ArgumentResolver resolver = getArgumentResolver(parameter);
Assert.notNull(resolver, "Unknown parameter type [" + parameter.getParameterType().getName() + "]");
return resolver.resolveArgument(parameter, message);
}
@ -78,10 +77,10 @@ public class ArgumentResolverComposite<M extends Message> implements ArgumentRes
/**
* Find a registered {@link ArgumentResolver} that supports the given method parameter.
*/
private ArgumentResolver<M> getArgumentResolver(MethodParameter parameter) {
ArgumentResolver<M> result = this.argumentResolverCache.get(parameter);
private ArgumentResolver getArgumentResolver(MethodParameter parameter) {
ArgumentResolver result = this.argumentResolverCache.get(parameter);
if (result == null) {
for (ArgumentResolver<M> resolver : this.argumentResolvers) {
for (ArgumentResolver resolver : this.argumentResolvers) {
if (resolver.supportsParameter(parameter)) {
result = resolver;
this.argumentResolverCache.put(parameter, result);
@ -95,7 +94,7 @@ public class ArgumentResolverComposite<M extends Message> implements ArgumentRes
/**
* Add the given {@link ArgumentResolver}.
*/
public ArgumentResolverComposite<M> addResolver(ArgumentResolver<M> argumentResolver) {
public ArgumentResolverComposite addResolver(ArgumentResolver argumentResolver) {
this.argumentResolvers.add(argumentResolver);
return this;
}
@ -103,9 +102,9 @@ public class ArgumentResolverComposite<M extends Message> implements ArgumentRes
/**
* Add the given {@link ArgumentResolver}s.
*/
public ArgumentResolverComposite<M> addResolvers(List<? extends ArgumentResolver<M>> argumentResolvers) {
public ArgumentResolverComposite addResolvers(List<? extends ArgumentResolver> argumentResolvers) {
if (argumentResolvers != null) {
for (ArgumentResolver<M> resolver : argumentResolvers) {
for (ArgumentResolver resolver : argumentResolvers) {
this.argumentResolvers.add(resolver);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -44,10 +44,9 @@ import org.springframework.web.method.HandlerMethod;
* @author Rossen Stoyanchev
* @since 4.0
*/
@SuppressWarnings("rawtypes")
public class InvocableMessageHandlerMethod<M extends Message> extends HandlerMethod {
public class InvocableMessageHandlerMethod extends HandlerMethod {
private ArgumentResolverComposite<M> argumentResolvers = new ArgumentResolverComposite<M>();
private ArgumentResolverComposite argumentResolvers = new ArgumentResolverComposite();
private ParameterNameDiscoverer parameterNameDiscoverer = new LocalVariableTableParameterNameDiscoverer();
@ -85,7 +84,7 @@ public class InvocableMessageHandlerMethod<M extends Message> extends HandlerMet
* Set {@link ArgumentResolver}s to use to use for resolving method
* argument values.
*/
public void setMessageMethodArgumentResolvers(ArgumentResolverComposite<M> argumentResolvers) {
public void setMessageMethodArgumentResolvers(ArgumentResolverComposite argumentResolvers) {
this.argumentResolvers = argumentResolvers;
}
@ -107,7 +106,7 @@ public class InvocableMessageHandlerMethod<M extends Message> extends HandlerMet
* @exception Exception raised if no suitable argument resolver can be found, or the
* method raised an exception
*/
public final Object invoke(M message, Object... providedArgs) throws Exception {
public final Object invoke(Message<?> message, Object... providedArgs) throws Exception {
Object[] args = getMethodArgumentValues(message, providedArgs);
@ -130,7 +129,7 @@ public class InvocableMessageHandlerMethod<M extends Message> extends HandlerMet
/**
* Get the method argument values for the current request.
*/
private Object[] getMethodArgumentValues(M message, Object... providedArgs) throws Exception {
private Object[] getMethodArgumentValues(Message<?> message, Object... providedArgs) throws Exception {
MethodParameter[] parameters = getMethodParameters();
Object[] args = new Object[parameters.length];

View File

@ -32,8 +32,7 @@ import org.springframework.web.messaging.support.PubSubHeaderAccesssor;
* @author Rossen Stoyanchev
* @since 4.0
*/
@SuppressWarnings("rawtypes")
public class MessageBodyArgumentResolver<M extends Message> implements ArgumentResolver<M> {
public class MessageBodyArgumentResolver implements ArgumentResolver {
private final MessageConverter converter;
@ -48,7 +47,7 @@ public class MessageBodyArgumentResolver<M extends Message> implements ArgumentR
}
@Override
public Object resolveArgument(MethodParameter parameter, M message) throws Exception {
public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception {
Object arg = null;

View File

@ -26,13 +26,12 @@ import org.springframework.util.Assert;
* @author Rossen Stoyanchev
* @since 4.0
*/
@SuppressWarnings("rawtypes")
public class MessageChannelArgumentResolver<M extends Message> implements ArgumentResolver<M> {
public class MessageChannelArgumentResolver implements ArgumentResolver {
private MessageChannel<M> messageBrokerChannel;
private MessageChannel messageBrokerChannel;
public MessageChannelArgumentResolver(MessageChannel<M> messageBrokerChannel) {
public MessageChannelArgumentResolver(MessageChannel messageBrokerChannel) {
Assert.notNull(messageBrokerChannel, "messageBrokerChannel is required");
this.messageBrokerChannel = messageBrokerChannel;
}
@ -43,7 +42,7 @@ public class MessageChannelArgumentResolver<M extends Message> implements Argume
}
@Override
public Object resolveArgument(MethodParameter parameter, M message) throws Exception {
public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception {
return this.messageBrokerChannel;
}

View File

@ -16,8 +16,6 @@
package org.springframework.web.messaging.service.method;
import java.util.Map;
import org.springframework.core.MethodParameter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
@ -30,13 +28,12 @@ import org.springframework.web.messaging.support.PubSubHeaderAccesssor;
* @author Rossen Stoyanchev
* @since 4.0
*/
@SuppressWarnings("rawtypes")
public class MessageReturnValueHandler<M extends Message> implements ReturnValueHandler<M> {
public class MessageReturnValueHandler implements ReturnValueHandler {
private MessageChannel<M> clientChannel;
private MessageChannel clientChannel;
public MessageReturnValueHandler(MessageChannel<M> clientChannel) {
public MessageReturnValueHandler(MessageChannel clientChannel) {
Assert.notNull(clientChannel, "clientChannel is required");
this.clientChannel = clientChannel;
}
@ -59,28 +56,19 @@ public class MessageReturnValueHandler<M extends Message> implements ReturnValue
}
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType, M message)
public void handleReturnValue(Object returnValue, MethodParameter returnType, Message<?> message)
throws Exception {
Assert.notNull(this.clientChannel, "No clientChannel to send messages to");
@SuppressWarnings("unchecked")
M returnMessage = (M) returnValue;
if (returnMessage == null) {
if (message == null) {
return;
}
returnMessage = processReturnMessage(returnMessage, message);
this.clientChannel.send(returnMessage);
}
protected M processReturnMessage(M returnMessage, M message) {
PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.wrap(message);
Assert.notNull(headers.getSubscriptionId(), "No subscription id: " + message);
PubSubHeaderAccesssor returnHeaders = PubSubHeaderAccesssor.wrap(returnMessage);
PubSubHeaderAccesssor returnHeaders = PubSubHeaderAccesssor.wrap(message);
returnHeaders.setSessionId(headers.getSessionId());
returnHeaders.setSubscriptionId(headers.getSubscriptionId());
@ -88,12 +76,11 @@ public class MessageReturnValueHandler<M extends Message> implements ReturnValue
returnHeaders.setDestination(headers.getDestination());
}
return createMessage(returnMessage.getPayload(), returnHeaders.toHeaders());
}
Message<?> returnMessage = MessageBuilder.withPayload(
message.getPayload()).copyHeaders(headers.toHeaders()).build();
this.clientChannel.send(returnMessage);
}
@SuppressWarnings("unchecked")
private M createMessage(Object payload, Map<String, Object> headers) {
return (M) MessageBuilder.withPayload(payload).copyHeaders(headers).build();
}
}

View File

@ -27,8 +27,7 @@ import org.springframework.messaging.Message;
* @author Rossen Stoyanchev
* @since 4.0
*/
@SuppressWarnings("rawtypes")
public interface ReturnValueHandler<M extends Message> {
public interface ReturnValueHandler {
/**
* Whether the given {@linkplain MethodParameter method return type} is
@ -51,6 +50,6 @@ public interface ReturnValueHandler<M extends Message> {
* @param message the message that caused this method to be called
* @throws Exception if the return value handling results in an error
*/
void handleReturnValue(Object returnValue, MethodParameter returnType, M message) throws Exception;
void handleReturnValue(Object returnValue, MethodParameter returnType, Message<?> message) throws Exception;
}

View File

@ -28,16 +28,15 @@ import org.springframework.util.Assert;
* @author Rossen Stoyanchev
* @since 4.0
*/
@SuppressWarnings("rawtypes")
public class ReturnValueHandlerComposite<M extends Message> implements ReturnValueHandler<M> {
public class ReturnValueHandlerComposite implements ReturnValueHandler {
private final List<ReturnValueHandler<M>> returnValueHandlers = new ArrayList<ReturnValueHandler<M>>();
private final List<ReturnValueHandler> returnValueHandlers = new ArrayList<ReturnValueHandler>();
/**
* Add the given {@link ReturnValueHandler}.
*/
public ReturnValueHandlerComposite<M> addHandler(ReturnValueHandler<M> returnValuehandler) {
public ReturnValueHandlerComposite addHandler(ReturnValueHandler returnValuehandler) {
this.returnValueHandlers.add(returnValuehandler);
return this;
}
@ -45,9 +44,9 @@ public class ReturnValueHandlerComposite<M extends Message> implements ReturnVal
/**
* Add the given {@link ReturnValueHandler}s.
*/
public ReturnValueHandlerComposite<M> addHandlers(List<? extends ReturnValueHandler<M>> handlers) {
public ReturnValueHandlerComposite addHandlers(List<? extends ReturnValueHandler> handlers) {
if (handlers != null) {
for (ReturnValueHandler<M> handler : handlers) {
for (ReturnValueHandler handler : handlers) {
this.returnValueHandlers.add(handler);
}
}
@ -59,8 +58,8 @@ public class ReturnValueHandlerComposite<M extends Message> implements ReturnVal
return getReturnValueHandler(returnType) != null;
}
private ReturnValueHandler<M> getReturnValueHandler(MethodParameter returnType) {
for (ReturnValueHandler<M> handler : this.returnValueHandlers) {
private ReturnValueHandler getReturnValueHandler(MethodParameter returnType) {
for (ReturnValueHandler handler : this.returnValueHandlers) {
if (handler.supportsReturnType(returnType)) {
return handler;
}
@ -69,10 +68,10 @@ public class ReturnValueHandlerComposite<M extends Message> implements ReturnVal
}
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType, M message)
public void handleReturnValue(Object returnValue, MethodParameter returnType, Message<?> message)
throws Exception {
ReturnValueHandler<M> handler = getReturnValueHandler(returnType);
ReturnValueHandler handler = getReturnValueHandler(returnType);
Assert.notNull(handler, "Unknown return value type [" + returnType.getParameterType().getName() + "]");
handler.handleReturnValue(returnValue, returnType, message);
}

View File

@ -35,8 +35,7 @@ import org.springframework.web.messaging.stomp.StompConversionException;
* @author Rossen Stoyanchev
* @since 4.0
*/
@SuppressWarnings("rawtypes")
public class StompMessageConverter<M extends Message> {
public class StompMessageConverter {
private static final Charset STOMP_CHARSET = Charset.forName("UTF-8");
@ -49,7 +48,7 @@ public class StompMessageConverter<M extends Message> {
/**
* @param stompContent a complete STOMP message (without the trailing 0x00) as byte[] or String.
*/
public M toMessage(Object stompContent, String sessionId) {
public Message<?> toMessage(Object stompContent, String sessionId) {
byte[] byteContent = null;
if (stompContent instanceof String) {
@ -100,12 +99,7 @@ public class StompMessageConverter<M extends Message> {
byte[] payload = new byte[totalLength - payloadIndex];
System.arraycopy(byteContent, payloadIndex, payload, 0, totalLength - payloadIndex);
return createMessage(stompHeaders, payload);
}
@SuppressWarnings("unchecked")
private M createMessage(StompHeaderAccessor stompHeaders, byte[] payload) {
return (M) MessageBuilder.withPayload(payload).copyHeaders(stompHeaders.toHeaders()).build();
return MessageBuilder.withPayload(payload).copyHeaders(stompHeaders.toHeaders()).build();
}
private int findIndexOfPayload(byte[] bytes) {
@ -135,7 +129,7 @@ public class StompMessageConverter<M extends Message> {
return index;
}
public byte[] fromMessage(M message) {
public byte[] fromMessage(Message<?> message) {
byte[] payload;
if (message.getPayload() instanceof byte[]) {

View File

@ -55,15 +55,14 @@ import reactor.tcp.netty.NettyTcpClient;
* @author Rossen Stoyanchev
* @since 4.0
*/
@SuppressWarnings("rawtypes")
public class StompRelayPubSubMessageHandler<M extends Message> extends AbstractPubSubMessageHandler<M>
public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler
implements SmartLifecycle {
private static final String STOMP_RELAY_SYSTEM_SESSION_ID = "stompRelaySystemSessionId";
private MessageChannel<M> clientChannel;
private MessageChannel clientChannel;
private final StompMessageConverter<M> stompMessageConverter = new StompMessageConverter<M>();
private final StompMessageConverter stompMessageConverter = new StompMessageConverter();
private MessageConverter payloadConverter;
@ -80,7 +79,7 @@ public class StompRelayPubSubMessageHandler<M extends Message> extends AbstractP
* @param clientChannel a channel for sending messages from the remote message broker
* back to clients
*/
public StompRelayPubSubMessageHandler(PubSubChannelRegistry<M, ?> registry) {
public StompRelayPubSubMessageHandler(PubSubChannelRegistry registry) {
Assert.notNull(registry, "registry is required");
this.clientChannel = registry.getClientOutputChannel();
this.payloadConverter = new CompositeMessageConverter(null);
@ -129,12 +128,12 @@ public class StompRelayPubSubMessageHandler<M extends Message> extends AbstractP
headers.setLogin("guest");
headers.setPasscode("guest");
headers.setHeartbeat(0, 0);
@SuppressWarnings("unchecked")
M message = (M) MessageBuilder.withPayload(new byte[0]).copyHeaders(headers.toStompMessageHeaders()).build();
Message<?> message = MessageBuilder.withPayload(
new byte[0]).copyHeaders(headers.toStompMessageHeaders()).build();
RelaySession session = new RelaySession(message, headers) {
@Override
protected void sendMessageToClient(M message) {
protected void sendMessageToClient(Message<?> message) {
// TODO: check for ERROR frame (reconnect?)
}
};
@ -166,7 +165,7 @@ public class StompRelayPubSubMessageHandler<M extends Message> extends AbstractP
}
@Override
public void handleConnect(M message) {
public void handleConnect(Message<?> message) {
StompHeaderAccessor stompHeaders = StompHeaderAccessor.wrap(message);
String sessionId = stompHeaders.getSessionId();
if (sessionId == null) {
@ -178,22 +177,22 @@ public class StompRelayPubSubMessageHandler<M extends Message> extends AbstractP
}
@Override
public void handlePublish(M message) {
public void handlePublish(Message<?> message) {
forwardMessage(message, StompCommand.SEND);
}
@Override
public void handleSubscribe(M message) {
public void handleSubscribe(Message<?> message) {
forwardMessage(message, StompCommand.SUBSCRIBE);
}
@Override
public void handleUnsubscribe(M message) {
public void handleUnsubscribe(Message<?> message) {
forwardMessage(message, StompCommand.UNSUBSCRIBE);
}
@Override
public void handleDisconnect(M message) {
public void handleDisconnect(Message<?> message) {
StompHeaderAccessor stompHeaders = StompHeaderAccessor.wrap(message);
if (stompHeaders.getStompCommand() != null) {
forwardMessage(message, StompCommand.DISCONNECT);
@ -206,13 +205,13 @@ public class StompRelayPubSubMessageHandler<M extends Message> extends AbstractP
}
@Override
public void handleOther(M message) {
public void handleOther(Message<?> message) {
StompCommand command = (StompCommand) message.getHeaders().get(PubSubHeaderAccesssor.PROTOCOL_MESSAGE_TYPE);
Assert.notNull(command, "Expected STOMP command: " + message.getHeaders());
forwardMessage(message, command);
}
private void forwardMessage(M message, StompCommand command) {
private void forwardMessage(Message<?> message, StompCommand command) {
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
headers.setStompCommandIfNotSet(command);
@ -249,14 +248,14 @@ public class StompRelayPubSubMessageHandler<M extends Message> extends AbstractP
private final Promise<TcpConnection<String, String>> promise;
private final BlockingQueue<M> messageQueue = new LinkedBlockingQueue<M>(50);
private final BlockingQueue<Message<?>> messageQueue = new LinkedBlockingQueue<Message<?>>(50);
private final Object monitor = new Object();
private volatile boolean isConnected = false;
public RelaySession(final M message, final StompHeaderAccessor stompHeaders) {
public RelaySession(final Message<?> message, final StompHeaderAccessor stompHeaders) {
Assert.notNull(message, "message is required");
Assert.notNull(stompHeaders, "stompHeaders is required");
@ -297,7 +296,7 @@ public class StompRelayPubSubMessageHandler<M extends Message> extends AbstractP
return;
}
M message = stompMessageConverter.toMessage(stompFrame, this.sessionId);
Message<?> message = stompMessageConverter.toMessage(stompFrame, this.sessionId);
if (logger.isTraceEnabled()) {
logger.trace("Reading message " + message);
}
@ -319,7 +318,7 @@ public class StompRelayPubSubMessageHandler<M extends Message> extends AbstractP
sendMessageToClient(message);
}
protected void sendMessageToClient(M message) {
protected void sendMessageToClient(Message<?> message) {
clientChannel.send(message);
}
@ -327,12 +326,11 @@ public class StompRelayPubSubMessageHandler<M extends Message> extends AbstractP
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR);
headers.setSessionId(sessionId);
headers.setMessage(errorText);
@SuppressWarnings("unchecked")
M errorMessage = (M) MessageBuilder.withPayload(new byte[0]).copyHeaders(headers.toHeaders()).build();
Message<?> errorMessage = MessageBuilder.withPayload(new byte[0]).copyHeaders(headers.toHeaders()).build();
sendMessageToClient(errorMessage);
}
public void forward(M message, StompHeaderAccessor headers) {
public void forward(Message<?> message, StompHeaderAccessor headers) {
if (!this.isConnected) {
synchronized(this.monitor) {
@ -358,7 +356,7 @@ public class StompRelayPubSubMessageHandler<M extends Message> extends AbstractP
}
private void flushMessages(TcpConnection<String, String> connection) {
List<M> messages = new ArrayList<M>();
List<Message<?>> messages = new ArrayList<Message<?>>();
this.messageQueue.drainTo(messages);
for (Message<?> message : messages) {
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
@ -374,8 +372,7 @@ public class StompRelayPubSubMessageHandler<M extends Message> extends AbstractP
MediaType contentType = headers.getContentType();
byte[] payload = payloadConverter.convertToPayload(message.getPayload(), contentType);
@SuppressWarnings("unchecked")
M byteMessage = (M) MessageBuilder.withPayload(payload).copyHeaders(headers.toHeaders()).build();
Message<?> byteMessage = MessageBuilder.withPayload(payload).copyHeaders(headers.toHeaders()).build();
if (logger.isTraceEnabled()) {
logger.trace("Forwarding message " + byteMessage);

View File

@ -51,24 +51,22 @@ import reactor.util.Assert;
* @author Rossen Stoyanchev
* @since 4.0
*/
@SuppressWarnings("rawtypes")
public class StompWebSocketHandler<M extends Message> extends TextWebSocketHandlerAdapter
implements MessageHandler<M> {
public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implements MessageHandler {
private static final byte[] EMPTY_PAYLOAD = new byte[0];
private static Log logger = LogFactory.getLog(StompWebSocketHandler.class);
private MessageChannel<M> outputChannel;
private MessageChannel outputChannel;
private final StompMessageConverter<M> stompMessageConverter = new StompMessageConverter<M>();
private final StompMessageConverter stompMessageConverter = new StompMessageConverter();
private final Map<String, SessionInfo> sessionInfos = new ConcurrentHashMap<String, SessionInfo>();
private MessageConverter payloadConverter = new CompositeMessageConverter(null);
public StompWebSocketHandler(PubSubChannelRegistry<M, ?> registry) {
public StompWebSocketHandler(PubSubChannelRegistry registry) {
Assert.notNull(registry, "registry is required");
this.outputChannel = registry.getClientInputChannel();
}
@ -77,7 +75,7 @@ public class StompWebSocketHandler<M extends Message> extends TextWebSocketHandl
this.payloadConverter = new CompositeMessageConverter(converters);
}
public StompMessageConverter<M> getStompMessageConverter() {
public StompMessageConverter getStompMessageConverter() {
return this.stompMessageConverter;
}
@ -95,7 +93,7 @@ public class StompWebSocketHandler<M extends Message> extends TextWebSocketHandl
protected void handleTextMessage(WebSocketSession session, TextMessage textMessage) {
try {
String payload = textMessage.getPayload();
M message = this.stompMessageConverter.toMessage(payload, session.getId());
Message<?> message = this.stompMessageConverter.toMessage(payload, session.getId());
// TODO: validate size limits
// http://stomp.github.io/stomp-specification-1.2.html#Size_Limits
@ -138,7 +136,7 @@ public class StompWebSocketHandler<M extends Message> extends TextWebSocketHandl
}
}
protected void handleConnect(final WebSocketSession session, M message) throws IOException {
protected void handleConnect(final WebSocketSession session, Message<?> message) throws IOException {
StompHeaderAccessor connectHeaders = StompHeaderAccessor.wrap(message);
StompHeaderAccessor connectedHeaders = StompHeaderAccessor.create(StompCommand.CONNECTED);
@ -160,17 +158,16 @@ public class StompWebSocketHandler<M extends Message> extends TextWebSocketHandl
// TODO: security
@SuppressWarnings("unchecked")
M connectedMessage = (M) MessageBuilder.withPayload(EMPTY_PAYLOAD).copyHeaders(
Message<?> connectedMessage = MessageBuilder.withPayload(EMPTY_PAYLOAD).copyHeaders(
connectedHeaders.toHeaders()).build();
byte[] bytes = getStompMessageConverter().fromMessage(connectedMessage);
session.sendMessage(new TextMessage(new String(bytes, Charset.forName("UTF-8"))));
}
protected void handlePublish(M stompMessage) {
protected void handlePublish(Message<?> stompMessage) {
}
protected void handleSubscribe(M message) {
protected void handleSubscribe(Message<?> message) {
// TODO: need a way to communicate back if subscription was successfully created or
// not in which case an ERROR should be sent back and close the connection
@ -184,13 +181,13 @@ public class StompWebSocketHandler<M extends Message> extends TextWebSocketHandl
sessionInfo.addSubscription(destination, headers.getSubscriptionId());
}
protected void handleUnsubscribe(M message) {
protected void handleUnsubscribe(Message<?> message) {
// TODO: remove subscription
}
protected void handleDisconnect(M stompMessage) {
protected void handleDisconnect(Message<?> stompMessage) {
}
protected void sendErrorMessage(WebSocketSession session, Throwable error) {
@ -198,8 +195,7 @@ public class StompWebSocketHandler<M extends Message> extends TextWebSocketHandl
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR);
headers.setMessage(error.getMessage());
@SuppressWarnings("unchecked")
M message = (M) MessageBuilder.withPayload(EMPTY_PAYLOAD).copyHeaders(headers.toHeaders()).build();
Message<?> message = MessageBuilder.withPayload(EMPTY_PAYLOAD).copyHeaders(headers.toHeaders()).build();
byte[] bytes = this.stompMessageConverter.fromMessage(message);
try {
@ -215,8 +211,7 @@ public class StompWebSocketHandler<M extends Message> extends TextWebSocketHandl
this.sessionInfos.remove(session.getId());
PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.create(MessageType.DISCONNECT);
headers.setSessionId(session.getId());
@SuppressWarnings("unchecked")
M message = (M) MessageBuilder.withPayload(new byte[0]).copyHeaders(headers.toHeaders()).build();
Message<?> message = MessageBuilder.withPayload(new byte[0]).copyHeaders(headers.toHeaders()).build();
this.outputChannel.send(message);
}
@ -224,7 +219,7 @@ public class StompWebSocketHandler<M extends Message> extends TextWebSocketHandl
* Handle STOMP messages going back out to WebSocket clients.
*/
@Override
public void handleMessage(M message) {
public void handleMessage(Message<?> message) {
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
headers.setStompCommandIfNotSet(StompCommand.MESSAGE);
@ -269,8 +264,7 @@ public class StompWebSocketHandler<M extends Message> extends TextWebSocketHandl
}
try {
@SuppressWarnings("unchecked")
M byteMessage = (M) MessageBuilder.withPayload(payload).copyHeaders(headers.toHeaders()).build();
Message<?> byteMessage = MessageBuilder.withPayload(payload).copyHeaders(headers.toHeaders()).build();
byte[] bytes = getStompMessageConverter().fromMessage(byteMessage);
session.sendMessage(new TextMessage(new String(bytes, Charset.forName("UTF-8"))));
}

View File

@ -17,8 +17,6 @@
package org.springframework.web.messaging.support;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;
import org.springframework.web.messaging.PubSubChannelRegistry;
@ -28,41 +26,39 @@ import org.springframework.web.messaging.PubSubChannelRegistry;
* @author Rossen Stoyanchev
* @since 4.0
*/
@SuppressWarnings("rawtypes")
public class AbstractPubSubChannelRegistry<M extends Message, H extends MessageHandler<M>>
implements PubSubChannelRegistry<M, H>, InitializingBean {
public class AbstractPubSubChannelRegistry implements PubSubChannelRegistry, InitializingBean {
private SubscribableChannel<M, H> clientInputChannel;
private SubscribableChannel clientInputChannel;
private SubscribableChannel<M, H> clientOutputChannel;
private SubscribableChannel clientOutputChannel;
private SubscribableChannel<M, H> messageBrokerChannel;
private SubscribableChannel messageBrokerChannel;
@Override
public SubscribableChannel<M, H> getClientInputChannel() {
public SubscribableChannel getClientInputChannel() {
return this.clientInputChannel;
}
public void setClientInputChannel(SubscribableChannel<M, H> channel) {
public void setClientInputChannel(SubscribableChannel channel) {
this.clientInputChannel = channel;
}
@Override
public SubscribableChannel<M, H> getClientOutputChannel() {
public SubscribableChannel getClientOutputChannel() {
return this.clientOutputChannel;
}
public void setClientOutputChannel(SubscribableChannel<M, H> channel) {
public void setClientOutputChannel(SubscribableChannel channel) {
this.clientOutputChannel = channel;
}
@Override
public SubscribableChannel<M, H> getMessageBrokerChannel() {
public SubscribableChannel getMessageBrokerChannel() {
return this.messageBrokerChannel;
}
public void setMessageBrokerChannel(SubscribableChannel<M, H> channel) {
public void setMessageBrokerChannel(SubscribableChannel channel) {
this.messageBrokerChannel = channel;
}

View File

@ -36,7 +36,7 @@ import reactor.fn.selector.ObjectSelector;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class ReactorMessageChannel implements SubscribableChannel<Message<?>, MessageHandler<Message<?>>> {
public class ReactorMessageChannel implements SubscribableChannel {
private static Log logger = LogFactory.getLog(ReactorMessageChannel.class);
@ -47,8 +47,8 @@ public class ReactorMessageChannel implements SubscribableChannel<Message<?>, Me
private String name = toString(); // TODO
private final Map<MessageHandler<Message<?>>, Registration<?>> registrations =
new HashMap<MessageHandler<Message<?>>, Registration<?>>();
private final Map<MessageHandler, Registration<?>> registrations =
new HashMap<MessageHandler, Registration<?>>();
public ReactorMessageChannel(Reactor reactor) {
@ -78,7 +78,7 @@ public class ReactorMessageChannel implements SubscribableChannel<Message<?>, Me
}
@Override
public boolean subscribe(final MessageHandler<Message<?>> handler) {
public boolean subscribe(final MessageHandler handler) {
if (this.registrations.containsKey(handler)) {
logger.warn("Channel " + getName() + ", handler already subscribed " + handler);
@ -98,7 +98,7 @@ public class ReactorMessageChannel implements SubscribableChannel<Message<?>, Me
}
@Override
public boolean unsubscribe(MessageHandler<Message<?>> handler) {
public boolean unsubscribe(MessageHandler handler) {
if (logger.isTraceEnabled()) {
logger.trace("Channel " + getName() + ", removing subscription for handler " + handler);
@ -119,9 +119,9 @@ public class ReactorMessageChannel implements SubscribableChannel<Message<?>, Me
private static final class MessageHandlerConsumer implements Consumer<Event<Message<?>>> {
private final MessageHandler<Message<?>> handler;
private final MessageHandler handler;
private MessageHandlerConsumer(MessageHandler<Message<?>> handler) {
private MessageHandlerConsumer(MessageHandler handler) {
this.handler = handler;
}

View File

@ -16,8 +16,6 @@
package org.springframework.web.messaging.support;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;
import reactor.core.Reactor;
@ -27,7 +25,7 @@ import reactor.core.Reactor;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class ReactorPubSubChannelRegistry extends AbstractPubSubChannelRegistry<Message<?>, MessageHandler<Message<?>>> {
public class ReactorPubSubChannelRegistry extends AbstractPubSubChannelRegistry {
public ReactorPubSubChannelRegistry(Reactor reactor) {