diff --git a/spring-context/src/main/java/org/springframework/messaging/MessageChannel.java b/spring-context/src/main/java/org/springframework/messaging/MessageChannel.java index aae499bf94..6bb8128e9c 100644 --- a/spring-context/src/main/java/org/springframework/messaging/MessageChannel.java +++ b/spring-context/src/main/java/org/springframework/messaging/MessageChannel.java @@ -23,8 +23,7 @@ package org.springframework.messaging; * @author Mark Fisher * @since 4.0 */ -@SuppressWarnings("rawtypes") -public interface MessageChannel { +public interface MessageChannel { /** * Send a {@link Message} to this channel. May throw a RuntimeException for @@ -39,7 +38,7 @@ public interface MessageChannel { * * @return whether or not the Message has been sent successfully */ - boolean send(M message); + boolean send(Message message); /** * Send a message, blocking until either the message is accepted or the @@ -52,6 +51,6 @@ public interface MessageChannel { * false if the specified timeout period elapses or * the send is interrupted */ - boolean send(M message, long timeout); + boolean send(Message message, long timeout); } diff --git a/spring-context/src/main/java/org/springframework/messaging/MessageFactory.java b/spring-context/src/main/java/org/springframework/messaging/MessageFactory.java deleted file mode 100644 index ea0d7a483b..0000000000 --- a/spring-context/src/main/java/org/springframework/messaging/MessageFactory.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging; - -import java.util.Map; - - -/** - * A factory for creating messages, allowing for control of the concrete type of the message. - * - * @author Andy Wilkinson - * @since 4.0 - */ -public interface MessageFactory> { - - /** - * Creates a new message with the given payload and headers - * - * @param payload The message payload - * @param headers The message headers - * @param

The payload's type - * - * @return the message - */ -

M createMessage(P payload, Map headers); -} diff --git a/spring-context/src/main/java/org/springframework/messaging/MessageHandler.java b/spring-context/src/main/java/org/springframework/messaging/MessageHandler.java index 8893717d49..5b637fe97d 100644 --- a/spring-context/src/main/java/org/springframework/messaging/MessageHandler.java +++ b/spring-context/src/main/java/org/springframework/messaging/MessageHandler.java @@ -24,11 +24,10 @@ package org.springframework.messaging; * @author Iwein Fuld * @since 4.0 */ -@SuppressWarnings("rawtypes") -public interface MessageHandler { +public interface MessageHandler { /** - * TODO: support exceptions? + * TODO: exceptions * * Handles the message if possible. If the handler cannot deal with the * message this will result in a MessageRejectedException e.g. @@ -47,6 +46,6 @@ public interface MessageHandler { * @throws org.springframework.integration.MessageDeliveryException when this handler failed to deliver the * reply related to the handling of the message */ - void handleMessage(M message) throws MessagingException; + void handleMessage(Message message) throws MessagingException; } diff --git a/spring-context/src/main/java/org/springframework/messaging/MessageHeaders.java b/spring-context/src/main/java/org/springframework/messaging/MessageHeaders.java index 7fb6a26aba..c9c25fa20c 100644 --- a/spring-context/src/main/java/org/springframework/messaging/MessageHeaders.java +++ b/spring-context/src/main/java/org/springframework/messaging/MessageHeaders.java @@ -38,9 +38,6 @@ import org.apache.commons.logging.LogFactory; * IMPORTANT: MessageHeaders are immutable. Any mutating operation (e.g., put(..), putAll(..) etc.) * will result in {@link UnsupportedOperationException} * - *

- * TODO: update below instructions - * *

To create MessageHeaders instance use fluent MessageBuilder API *

  * MessageBuilder.withPayload("foo").setHeader("key1", "value1").setHeader("key2", "value2");
@@ -61,7 +58,7 @@ import org.apache.commons.logging.LogFactory;
  */
 public class MessageHeaders implements Map, Serializable {
 
-	private static final long serialVersionUID = 8946067357652612145L;
+	private static final long serialVersionUID = -4615750558355702881L;
 
 	private static final Log logger = LogFactory.getLog(MessageHeaders.class);
 
@@ -77,6 +74,27 @@ public class MessageHeaders implements Map, Serializable {
 
 	public static final String TIMESTAMP = "timestamp";
 
+	public static final String CORRELATION_ID = "correlationId";
+
+	public static final String REPLY_CHANNEL = "replyChannel";
+
+	public static final String ERROR_CHANNEL = "errorChannel";
+
+	public static final String EXPIRATION_DATE = "expirationDate";
+
+	public static final String PRIORITY = "priority";
+
+	public static final String SEQUENCE_NUMBER = "sequenceNumber";
+
+	public static final String SEQUENCE_SIZE = "sequenceSize";
+
+	public static final String SEQUENCE_DETAILS = "sequenceDetails";
+
+	public static final String CONTENT_TYPE = "contentType";
+
+	public static final String POSTPROCESS_RESULT = "postProcessResult";
+
+
 	public static final List HEADER_NAMES = Arrays.asList(ID, TIMESTAMP);
 
 
@@ -103,6 +121,36 @@ public class MessageHeaders implements Map, Serializable {
 		return this.get(TIMESTAMP, Long.class);
 	}
 
+	public Long getExpirationDate() {
+		return this.get(EXPIRATION_DATE, Long.class);
+	}
+
+	public Object getCorrelationId() {
+		return this.get(CORRELATION_ID);
+	}
+
+	public Integer getSequenceNumber() {
+		Integer sequenceNumber = this.get(SEQUENCE_NUMBER, Integer.class);
+		return (sequenceNumber != null ? sequenceNumber : 0);
+	}
+
+	public Integer getSequenceSize() {
+		Integer sequenceSize = this.get(SEQUENCE_SIZE, Integer.class);
+		return (sequenceSize != null ? sequenceSize : 0);
+	}
+
+	public Integer getPriority() {
+		return this.get(PRIORITY, Integer.class);
+	}
+
+	public Object getReplyChannel() {
+        return this.get(REPLY_CHANNEL);
+    }
+
+    public Object getErrorChannel() {
+        return this.get(ERROR_CHANNEL);
+    }
+
 	@SuppressWarnings("unchecked")
 	public  T get(Object key, Class type) {
 		Object value = this.headers.get(key);
diff --git a/spring-context/src/main/java/org/springframework/messaging/PollableChannel.java b/spring-context/src/main/java/org/springframework/messaging/PollableChannel.java
new file mode 100644
index 0000000000..d8de221c81
--- /dev/null
+++ b/spring-context/src/main/java/org/springframework/messaging/PollableChannel.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2002-2013 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.messaging;
+
+
+/**
+ * Interface for Message Channels from which Messages may be actively received through polling.
+ *
+ * @author Mark Fisher
+ * @since 4.0
+ */
+public interface PollableChannel extends MessageChannel {
+
+	/**
+	 * Receive a message from this channel, blocking indefinitely if necessary.
+	 *
+	 * @return the next available {@link Message} or null if interrupted
+	 */
+	Message receive();
+
+	/**
+	 * Receive a message from this channel, blocking until either a message is
+	 * available or the specified timeout period elapses.
+	 *
+	 * @param timeout the timeout in milliseconds
+	 *
+	 * @return the next available {@link Message} or null if the
+	 * specified timeout period elapses or the message reception is interrupted
+	 */
+	Message receive(long timeout);
+
+}
diff --git a/spring-context/src/main/java/org/springframework/messaging/SubscribableChannel.java b/spring-context/src/main/java/org/springframework/messaging/SubscribableChannel.java
index 7e87eadb19..e7ff7385c7 100644
--- a/spring-context/src/main/java/org/springframework/messaging/SubscribableChannel.java
+++ b/spring-context/src/main/java/org/springframework/messaging/SubscribableChannel.java
@@ -25,18 +25,16 @@ package org.springframework.messaging;
  * @author Mark Fisher
  * @since 4.0
  */
-@SuppressWarnings("rawtypes")
-public interface SubscribableChannel>
-		extends MessageChannel {
+public interface SubscribableChannel extends MessageChannel {
 
 	/**
 	 * Register a {@link MessageHandler} as a subscriber to this channel.
 	 */
-	boolean subscribe(H handler);
+	boolean subscribe(MessageHandler handler);
 
 	/**
 	 * Remove a {@link MessageHandler} from the subscribers of this channel.
 	 */
-	boolean unsubscribe(H handler);
+	boolean unsubscribe(MessageHandler handler);
 
 }
diff --git a/spring-context/src/main/java/org/springframework/messaging/support/GenericMessageFactory.java b/spring-context/src/main/java/org/springframework/messaging/support/ErrorMessage.java
similarity index 57%
rename from spring-context/src/main/java/org/springframework/messaging/support/GenericMessageFactory.java
rename to spring-context/src/main/java/org/springframework/messaging/support/ErrorMessage.java
index 30f80823fa..135d7a2a58 100644
--- a/spring-context/src/main/java/org/springframework/messaging/support/GenericMessageFactory.java
+++ b/spring-context/src/main/java/org/springframework/messaging/support/ErrorMessage.java
@@ -5,7 +5,7 @@
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,21 +18,24 @@ package org.springframework.messaging.support;
 
 import java.util.Map;
 
-import org.springframework.messaging.MessageFactory;
-
-
 /**
- * A {@link MessageFactory} that creates {@link GenericMessage GenericMessages}.
+ * A message implementation that accepts a {@link Throwable} payload.
+ * Once created this object is immutable.
  *
- * @author Andy Wilkinson
+ * @author Mark Fisher
+ * @author Oleg Zhurakousky
  * @since 4.0
  */
-public class GenericMessageFactory implements MessageFactory> {
+public class ErrorMessage extends GenericMessage {
 
+	private static final long serialVersionUID = -5470210965279837728L;
 
-	@Override
-	public 

GenericMessage

createMessage(P payload, Map headers) { - return new GenericMessage

(payload, headers); + public ErrorMessage(Throwable payload) { + super(payload); + } + + public ErrorMessage(Throwable payload, Map headers) { + super(payload, headers); } } diff --git a/spring-context/src/main/java/org/springframework/messaging/support/MessageBuilder.java b/spring-context/src/main/java/org/springframework/messaging/support/MessageBuilder.java index 96d3859f6e..90e529a7e2 100644 --- a/spring-context/src/main/java/org/springframework/messaging/support/MessageBuilder.java +++ b/spring-context/src/main/java/org/springframework/messaging/support/MessageBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2010 the original author or authors. + * Copyright 2002-2013 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,19 +17,25 @@ package org.springframework.messaging.support; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import org.springframework.messaging.Message; -import org.springframework.messaging.MessageFactory; +import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHeaders; import org.springframework.util.Assert; import org.springframework.util.PatternMatchUtils; import org.springframework.util.StringUtils; /** + * TODO + * * @author Arjen Poutsma * @author Mark Fisher * @author Oleg Zhurakousky @@ -44,46 +50,24 @@ public final class MessageBuilder { private final Message originalMessage; - @SuppressWarnings("rawtypes") - private static volatile MessageFactory messageFactory = null; - + private volatile boolean modified; /** - * A constructor with payload and an optional message to copy headers from. - * This is a private constructor to be invoked from the static factory methods only. - * - * @param payload the message payload, never {@code null} - * @param originalMessage a message to copy from or re-use if no changes are made, can - * be {@code null} + * Private constructor to be invoked from the static factory methods only. */ private MessageBuilder(T payload, Message originalMessage) { - Assert.notNull(payload, "payload is required"); + Assert.notNull(payload, "payload must not be null"); this.payload = payload; this.originalMessage = originalMessage; if (originalMessage != null) { - this.headers.putAll(originalMessage.getHeaders()); + this.copyHeaders(originalMessage.getHeaders()); + this.modified = (!this.payload.equals(originalMessage.getPayload())); } } /** - * Private constructor to be invoked from the static factory methods only. - * - * @param payload the message payload, never {@code null} - * @param originalMessage a message to copy from or re-use if no changes are made, can - * be {@code null} - */ - private MessageBuilder(T payload, Map headers) { - Assert.notNull(payload, "payload is required"); - Assert.notNull(headers, "headers is required"); - this.payload = payload; - this.headers.putAll(headers); - this.originalMessage = null; - } - - /** - * Create a builder for a new {@link Message} instance pre-populated with all of the - * headers copied from the provided message. The payload of the provided Message will - * also be used as the payload for the new message. + * Create a builder for a new {@link Message} instance pre-populated with all of the headers copied from the + * provided message. The payload of the provided Message will also be used as the payload for the new message. * * @param message the Message from which the payload and all headers will be copied */ @@ -99,70 +83,69 @@ public final class MessageBuilder { * @param payload the payload for the new message */ public static MessageBuilder withPayload(T payload) { - MessageBuilder builder = new MessageBuilder(payload, (Message) null); + MessageBuilder builder = new MessageBuilder(payload, null); return builder; } /** - * Set the value for the given header name. If the provided value is null - * the header will be removed. + * Set the value for the given header name. If the provided value is null, the header will be removed. */ public MessageBuilder setHeader(String headerName, Object headerValue) { Assert.isTrue(!this.isReadOnly(headerName), "The '" + headerName + "' header is read-only."); - if (StringUtils.hasLength(headerName)) { - putOrRemove(headerName, headerValue); + if (StringUtils.hasLength(headerName) && !headerName.equals(MessageHeaders.ID) + && !headerName.equals(MessageHeaders.TIMESTAMP)) { + this.verifyType(headerName, headerValue); + if (headerValue == null) { + Object removedValue = this.headers.remove(headerName); + if (removedValue != null) { + this.modified = true; + } + } + else { + Object replacedValue = this.headers.put(headerName, headerValue); + if (!headerValue.equals(replacedValue)) { + this.modified = true; + } + } } return this; } - private boolean isReadOnly(String headerName) { - return MessageHeaders.ID.equals(headerName) || MessageHeaders.TIMESTAMP.equals(headerName); - } - - private void putOrRemove(String headerName, Object headerValue) { - if (headerValue == null) { - this.headers.remove(headerName); - } - else { - this.headers.put(headerName, headerValue); - } - } - /** - * Set the value for the given header name only if the header name is not already - * associated with a value. + * Set the value for the given header name only if the header name is not already associated with a value. */ public MessageBuilder setHeaderIfAbsent(String headerName, Object headerValue) { if (this.headers.get(headerName) == null) { - putOrRemove(headerName, headerValue); + this.setHeader(headerName, headerValue); } return this; } /** - * Removes all headers provided via array of 'headerPatterns'. As the name suggests - * the array may contain simple matching patterns for header names. Supported pattern - * styles are: "xxx*", "*xxx", "*xxx*" and "xxx*yyy". + * Removes all headers provided via array of 'headerPatterns'. As the name suggests the array + * may contain simple matching patterns for header names. Supported pattern styles are: + * "xxx*", "*xxx", "*xxx*" and "xxx*yyy". + * + * @param headerPatterns */ public MessageBuilder removeHeaders(String... headerPatterns) { - List toRemove = new ArrayList(); + List headersToRemove = new ArrayList(); for (String pattern : headerPatterns) { if (StringUtils.hasLength(pattern)){ if (pattern.contains("*")){ for (String headerName : this.headers.keySet()) { if (PatternMatchUtils.simpleMatch(pattern, headerName)){ - toRemove.add(headerName); + headersToRemove.add(headerName); } } } else { - toRemove.add(pattern); + headersToRemove.add(pattern); } } } - for (String headerName : toRemove) { - this.headers.remove(headerName); - putOrRemove(headerName, null); + for (String headerToRemove : headersToRemove) { + this.removeHeader(headerToRemove); } return this; } @@ -170,63 +153,182 @@ public final class MessageBuilder { * Remove the value for the given header name. */ public MessageBuilder removeHeader(String headerName) { - if (StringUtils.hasLength(headerName) && !isReadOnly(headerName)) { - this.headers.remove(headerName); + if (StringUtils.hasLength(headerName) && !headerName.equals(MessageHeaders.ID) + && !headerName.equals(MessageHeaders.TIMESTAMP)) { + Object removedValue = this.headers.remove(headerName); + if (removedValue != null) { + this.modified = true; + } } return this; } /** - * Copy the name-value pairs from the provided Map. This operation will overwrite any - * existing values. Use { {@link #copyHeadersIfAbsent(Map)} to avoid overwriting - * values. Note that the 'id' and 'timestamp' header values will never be overwritten. + * Copy the name-value pairs from the provided Map. This operation will overwrite any existing values. Use { + * {@link #copyHeadersIfAbsent(Map)} to avoid overwriting values. Note that the 'id' and 'timestamp' header values + * will never be overwritten. + * + * @see MessageHeaders#ID + * @see MessageHeaders#TIMESTAMP */ public MessageBuilder copyHeaders(Map headersToCopy) { Set keys = headersToCopy.keySet(); for (String key : keys) { if (!this.isReadOnly(key)) { - putOrRemove(key, headersToCopy.get(key)); + this.setHeader(key, headersToCopy.get(key)); } } return this; } /** - * Copy the name-value pairs from the provided Map. This operation will not - * overwrite any existing values. + * Copy the name-value pairs from the provided Map. This operation will not overwrite any existing values. */ public MessageBuilder copyHeadersIfAbsent(Map headersToCopy) { Set keys = headersToCopy.keySet(); for (String key : keys) { - if (!this.isReadOnly(key) && (this.headers.get(key) == null)) { - putOrRemove(key, headersToCopy.get(key)); + if (!this.isReadOnly(key)) { + this.setHeaderIfAbsent(key, headersToCopy.get(key)); } } return this; } - @SuppressWarnings("unchecked") - public Message build() { + public MessageBuilder setExpirationDate(Long expirationDate) { + return this.setHeader(MessageHeaders.EXPIRATION_DATE, expirationDate); + } - if (this.originalMessage != null - && this.headers.equals(this.originalMessage.getHeaders()) - && this.payload.equals(this.originalMessage.getPayload())) { - - return this.originalMessage; - } - -// if (this.payload instanceof Throwable) { -// return (Message) new ErrorMessage((Throwable) this.payload, this.headers); -// } - - this.headers.remove(MessageHeaders.ID); - this.headers.remove(MessageHeaders.TIMESTAMP); - - if (messageFactory == null) { - return new GenericMessage(this.payload, this.headers); + public MessageBuilder setExpirationDate(Date expirationDate) { + if (expirationDate != null) { + return this.setHeader(MessageHeaders.EXPIRATION_DATE, expirationDate.getTime()); } else { - return messageFactory.createMessage(payload, headers); + return this.setHeader(MessageHeaders.EXPIRATION_DATE, null); + } + } + + public MessageBuilder setCorrelationId(Object correlationId) { + return this.setHeader(MessageHeaders.CORRELATION_ID, correlationId); + } + + public MessageBuilder pushSequenceDetails(Object correlationId, int sequenceNumber, int sequenceSize) { + Object incomingCorrelationId = headers.get(MessageHeaders.CORRELATION_ID); + @SuppressWarnings("unchecked") + List> incomingSequenceDetails = (List>) headers.get(MessageHeaders.SEQUENCE_DETAILS); + if (incomingCorrelationId != null) { + if (incomingSequenceDetails == null) { + incomingSequenceDetails = new ArrayList>(); + } + else { + incomingSequenceDetails = new ArrayList>(incomingSequenceDetails); + } + incomingSequenceDetails.add(Arrays.asList(incomingCorrelationId, + headers.get(MessageHeaders.SEQUENCE_NUMBER), headers.get(MessageHeaders.SEQUENCE_SIZE))); + incomingSequenceDetails = Collections.unmodifiableList(incomingSequenceDetails); + } + if (incomingSequenceDetails != null) { + setHeader(MessageHeaders.SEQUENCE_DETAILS, incomingSequenceDetails); + } + return setCorrelationId(correlationId).setSequenceNumber(sequenceNumber).setSequenceSize(sequenceSize); + } + + public MessageBuilder popSequenceDetails() { + String key = MessageHeaders.SEQUENCE_DETAILS; + if (!headers.containsKey(key)) { + return this; + } + @SuppressWarnings("unchecked") + List> incomingSequenceDetails = new ArrayList>((List>) headers.get(key)); + List sequenceDetails = incomingSequenceDetails.remove(incomingSequenceDetails.size() - 1); + Assert.state(sequenceDetails.size() == 3, "Wrong sequence details (not created by MessageBuilder?): " + + sequenceDetails); + setCorrelationId(sequenceDetails.get(0)); + Integer sequenceNumber = (Integer) sequenceDetails.get(1); + Integer sequenceSize = (Integer) sequenceDetails.get(2); + if (sequenceNumber != null) { + setSequenceNumber(sequenceNumber); + } + if (sequenceSize != null) { + setSequenceSize(sequenceSize); + } + if (!incomingSequenceDetails.isEmpty()) { + headers.put(MessageHeaders.SEQUENCE_DETAILS, incomingSequenceDetails); + } + else { + headers.remove(MessageHeaders.SEQUENCE_DETAILS); + } + return this; + } + + public MessageBuilder setReplyChannel(MessageChannel replyChannel) { + return this.setHeader(MessageHeaders.REPLY_CHANNEL, replyChannel); + } + + public MessageBuilder setReplyChannelName(String replyChannelName) { + return this.setHeader(MessageHeaders.REPLY_CHANNEL, replyChannelName); + } + + public MessageBuilder setErrorChannel(MessageChannel errorChannel) { + return this.setHeader(MessageHeaders.ERROR_CHANNEL, errorChannel); + } + + public MessageBuilder setErrorChannelName(String errorChannelName) { + return this.setHeader(MessageHeaders.ERROR_CHANNEL, errorChannelName); + } + + public MessageBuilder setSequenceNumber(Integer sequenceNumber) { + return this.setHeader(MessageHeaders.SEQUENCE_NUMBER, sequenceNumber); + } + + public MessageBuilder setSequenceSize(Integer sequenceSize) { + return this.setHeader(MessageHeaders.SEQUENCE_SIZE, sequenceSize); + } + + public MessageBuilder setPriority(Integer priority) { + return this.setHeader(MessageHeaders.PRIORITY, priority); + } + + @SuppressWarnings("unchecked") + public Message build() { + if (!this.modified && this.originalMessage != null) { + return this.originalMessage; + } + if (this.payload instanceof Throwable) { + return (Message) new ErrorMessage((Throwable) this.payload, this.headers); + } + return new GenericMessage(this.payload, this.headers); + } + + private boolean isReadOnly(String headerName) { + return MessageHeaders.ID.equals(headerName) || MessageHeaders.TIMESTAMP.equals(headerName); + } + + private void verifyType(String headerName, Object headerValue) { + if (headerName != null && headerValue != null) { + if (MessageHeaders.ID.equals(headerName)) { + Assert.isTrue(headerValue instanceof UUID, "The '" + headerName + "' header value must be a UUID."); + } + else if (MessageHeaders.TIMESTAMP.equals(headerName)) { + Assert.isTrue(headerValue instanceof Long, "The '" + headerName + "' header value must be a Long."); + } + else if (MessageHeaders.EXPIRATION_DATE.equals(headerName)) { + Assert.isTrue(headerValue instanceof Date || headerValue instanceof Long, "The '" + headerName + + "' header value must be a Date or Long."); + } + else if (MessageHeaders.ERROR_CHANNEL.equals(headerName) + || MessageHeaders.REPLY_CHANNEL.endsWith(headerName)) { + Assert.isTrue(headerValue instanceof MessageChannel || headerValue instanceof String, "The '" + + headerName + "' header value must be a MessageChannel or String."); + } + else if (MessageHeaders.SEQUENCE_NUMBER.equals(headerName) + || MessageHeaders.SEQUENCE_SIZE.equals(headerName)) { + Assert.isTrue(Integer.class.isAssignableFrom(headerValue.getClass()), "The '" + headerName + + "' header value must be an Integer."); + } + else if (MessageHeaders.PRIORITY.equals(headerName)) { + Assert.isTrue(Integer.class.isAssignableFrom(headerValue.getClass()), "The '" + headerName + + "' header value must be an Integer."); + } } } diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/PubSubChannelRegistry.java b/spring-websocket/src/main/java/org/springframework/web/messaging/PubSubChannelRegistry.java index 6dc6db2bd0..7796902dae 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/PubSubChannelRegistry.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/PubSubChannelRegistry.java @@ -16,8 +16,6 @@ package org.springframework.web.messaging; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHandler; import org.springframework.messaging.SubscribableChannel; @@ -25,24 +23,22 @@ import org.springframework.messaging.SubscribableChannel; * @author Rossen Stoyanchev * @since 4.0 */ -@SuppressWarnings("rawtypes") -public interface PubSubChannelRegistry> { - +public interface PubSubChannelRegistry { /** * A channel for messaging arriving from clients. */ - SubscribableChannel getClientInputChannel(); + SubscribableChannel getClientInputChannel(); /** * A channel for sending direct messages to a client. The client must be have * previously subscribed to the destination of the message. */ - SubscribableChannel getClientOutputChannel(); + SubscribableChannel getClientOutputChannel(); /** * A channel for broadcasting messages through a message broker. */ - SubscribableChannel getMessageBrokerChannel(); + SubscribableChannel getMessageBrokerChannel(); } diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/service/AbstractPubSubMessageHandler.java b/spring-websocket/src/main/java/org/springframework/web/messaging/service/AbstractPubSubMessageHandler.java index 85e761f0ac..512466579e 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/service/AbstractPubSubMessageHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/service/AbstractPubSubMessageHandler.java @@ -37,8 +37,7 @@ import org.springframework.web.messaging.support.PubSubHeaderAccesssor; * @author Rossen Stoyanchev * @since 4.0 */ -@SuppressWarnings("rawtypes") -public abstract class AbstractPubSubMessageHandler implements MessageHandler { +public abstract class AbstractPubSubMessageHandler implements MessageHandler { protected final Log logger = LogFactory.getLog(getClass()); @@ -68,7 +67,7 @@ public abstract class AbstractPubSubMessageHandler implements protected abstract Collection getSupportedMessageTypes(); - protected boolean canHandle(M message, MessageType messageType) { + protected boolean canHandle(Message message, MessageType messageType) { if (!CollectionUtils.isEmpty(getSupportedMessageTypes())) { if (!getSupportedMessageTypes().contains(messageType)) { @@ -79,7 +78,7 @@ public abstract class AbstractPubSubMessageHandler implements return isDestinationAllowed(message); } - protected boolean isDestinationAllowed(M message) { + protected boolean isDestinationAllowed(Message message) { PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.wrap(message); String destination = headers.getDestination(); @@ -115,7 +114,7 @@ public abstract class AbstractPubSubMessageHandler implements } @Override - public final void handleMessage(M message) throws MessagingException { + public final void handleMessage(Message message) throws MessagingException { PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.wrap(message); MessageType messageType = headers.getMessageType(); @@ -144,22 +143,22 @@ public abstract class AbstractPubSubMessageHandler implements } } - protected void handleConnect(M message) { + protected void handleConnect(Message message) { } - protected void handlePublish(M message) { + protected void handlePublish(Message message) { } - protected void handleSubscribe(M message) { + protected void handleSubscribe(Message message) { } - protected void handleUnsubscribe(M message) { + protected void handleUnsubscribe(Message message) { } - protected void handleDisconnect(M message) { + protected void handleDisconnect(Message message) { } - protected void handleOther(M message) { + protected void handleOther(Message message) { } } diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/service/ReactorPubSubMessageHandler.java b/spring-websocket/src/main/java/org/springframework/web/messaging/service/ReactorPubSubMessageHandler.java index 89e82e697c..8cf3ef3da9 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/service/ReactorPubSubMessageHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/service/ReactorPubSubMessageHandler.java @@ -44,10 +44,9 @@ import reactor.fn.selector.ObjectSelector; * @author Rossen Stoyanchev * @since 4.0 */ -@SuppressWarnings("rawtypes") -public class ReactorPubSubMessageHandler extends AbstractPubSubMessageHandler { +public class ReactorPubSubMessageHandler extends AbstractPubSubMessageHandler { - private MessageChannel clientChannel; + private MessageChannel clientChannel; private final Reactor reactor; @@ -56,7 +55,7 @@ public class ReactorPubSubMessageHandler extends AbstractPubS private Map>> subscriptionsBySession = new ConcurrentHashMap>>(); - public ReactorPubSubMessageHandler(PubSubChannelRegistry registry, Reactor reactor) { + public ReactorPubSubMessageHandler(PubSubChannelRegistry registry, Reactor reactor) { Assert.notNull(reactor, "reactor is required"); this.clientChannel = registry.getClientOutputChannel(); this.reactor = reactor; @@ -73,7 +72,7 @@ public class ReactorPubSubMessageHandler extends AbstractPubS } @Override - public void handleSubscribe(M message) { + public void handleSubscribe(Message message) { if (logger.isDebugEnabled()) { logger.debug("Subscribe " + message); @@ -100,7 +99,7 @@ public class ReactorPubSubMessageHandler extends AbstractPubS } @Override - public void handlePublish(M message) { + public void handlePublish(Message message) { if (logger.isDebugEnabled()) { logger.debug("Message received: " + message); @@ -110,8 +109,7 @@ public class ReactorPubSubMessageHandler extends AbstractPubS // Convert to byte[] payload before the fan-out PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.wrap(message); byte[] payload = payloadConverter.convertToPayload(message.getPayload(), headers.getContentType()); - @SuppressWarnings("unchecked") - M m = (M) MessageBuilder.withPayload(payload).copyHeaders(message.getHeaders()).build(); + Message m = MessageBuilder.withPayload(payload).copyHeaders(message.getHeaders()).build(); this.reactor.notify(getPublishKey(headers.getDestination()), Event.wrap(m)); } @@ -121,7 +119,7 @@ public class ReactorPubSubMessageHandler extends AbstractPubS } @Override - public void handleDisconnect(M message) { + public void handleDisconnect(Message message) { PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.wrap(message); removeSubscriptions(headers.getSessionId()); } @@ -154,8 +152,8 @@ public class ReactorPubSubMessageHandler extends AbstractPubS PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.wrap(sentMessage); headers.setSubscriptionId(this.subscriptionId); - @SuppressWarnings("unchecked") - M clientMessage = (M) MessageBuilder.withPayload(sentMessage.getPayload()).copyHeaders(headers.toHeaders()).build(); + Message clientMessage = MessageBuilder.withPayload( + sentMessage.getPayload()).copyHeaders(headers.toHeaders()).build(); clientChannel.send(clientMessage); } diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/AnnotationPubSubMessageHandler.java b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/AnnotationPubSubMessageHandler.java index b0cf69946d..04461a7b93 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/AnnotationPubSubMessageHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/AnnotationPubSubMessageHandler.java @@ -54,11 +54,10 @@ import org.springframework.web.method.HandlerMethodSelector; * @author Rossen Stoyanchev * @since 4.0 */ -@SuppressWarnings("rawtypes") -public class AnnotationPubSubMessageHandler extends AbstractPubSubMessageHandler +public class AnnotationPubSubMessageHandler extends AbstractPubSubMessageHandler implements ApplicationContextAware, InitializingBean { - private PubSubChannelRegistry registry; + private PubSubChannelRegistry registry; private List messageConverters; @@ -73,12 +72,12 @@ public class AnnotationPubSubMessageHandler extends AbstractP private final Map, MessageExceptionHandlerMethodResolver> exceptionHandlerCache = new ConcurrentHashMap, MessageExceptionHandlerMethodResolver>(64); - private ArgumentResolverComposite argumentResolvers = new ArgumentResolverComposite(); + private ArgumentResolverComposite argumentResolvers = new ArgumentResolverComposite(); - private ReturnValueHandlerComposite returnValueHandlers = new ReturnValueHandlerComposite(); + private ReturnValueHandlerComposite returnValueHandlers = new ReturnValueHandlerComposite(); - public AnnotationPubSubMessageHandler(PubSubChannelRegistry registry) { + public AnnotationPubSubMessageHandler(PubSubChannelRegistry registry) { Assert.notNull(registry, "registry is required"); this.registry = registry; } @@ -102,10 +101,10 @@ public class AnnotationPubSubMessageHandler extends AbstractP initHandlerMethods(); - this.argumentResolvers.addResolver(new MessageChannelArgumentResolver(this.registry.getMessageBrokerChannel())); - this.argumentResolvers.addResolver(new MessageBodyArgumentResolver(this.messageConverters)); + this.argumentResolvers.addResolver(new MessageChannelArgumentResolver(this.registry.getMessageBrokerChannel())); + this.argumentResolvers.addResolver(new MessageBodyArgumentResolver(this.messageConverters)); - this.returnValueHandlers.addHandler(new MessageReturnValueHandler(this.registry.getClientOutputChannel())); + this.returnValueHandlers.addHandler(new MessageReturnValueHandler(this.registry.getClientOutputChannel())); } protected void initHandlerMethods() { @@ -139,9 +138,8 @@ public class AnnotationPubSubMessageHandler extends AbstractP new UnsubscribeMappingInfoCreator(), this.unsubscribeMethods); } - @SuppressWarnings("unchecked") private void initHandlerMethods(Object handler, Class handlerType, - final Class annotationType, MappingInfoCreator mappingInfoCreator, + final Class annotationType, MappingInfoCreator mappingInfoCreator, Map handlerMethods) { Set messageMethods = HandlerMethodSelector.selectMethods(handlerType, new MethodFilter() { @@ -171,21 +169,21 @@ public class AnnotationPubSubMessageHandler extends AbstractP } @Override - public void handlePublish(M message) { + public void handlePublish(Message message) { handleMessageInternal(message, this.messageMethods); } @Override - public void handleSubscribe(M message) { + public void handleSubscribe(Message message) { handleMessageInternal(message, this.subscribeMethods); } @Override - public void handleUnsubscribe(M message) { + public void handleUnsubscribe(Message message) { handleMessageInternal(message, this.unsubscribeMethods); } - private void handleMessageInternal(final M message, Map handlerMethods) { + private void handleMessageInternal(final Message message, Map handlerMethods) { PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.wrap(message); String destination = headers.getDestination(); @@ -198,7 +196,7 @@ public class AnnotationPubSubMessageHandler extends AbstractP HandlerMethod handlerMethod = match.createWithResolvedBean(); // TODO: avoid re-creating invocableHandlerMethod - InvocableMessageHandlerMethod invocableHandlerMethod = new InvocableMessageHandlerMethod(handlerMethod); + InvocableMessageHandlerMethod invocableHandlerMethod = new InvocableMessageHandlerMethod(handlerMethod); invocableHandlerMethod.setMessageMethodArgumentResolvers(this.argumentResolvers); try { @@ -225,9 +223,9 @@ public class AnnotationPubSubMessageHandler extends AbstractP } } - private void invokeExceptionHandler(M message, HandlerMethod handlerMethod, Exception ex) { + private void invokeExceptionHandler(Message message, HandlerMethod handlerMethod, Exception ex) { - InvocableMessageHandlerMethod invocableHandlerMethod; + InvocableMessageHandlerMethod invocableHandlerMethod; Class beanType = handlerMethod.getBeanType(); MessageExceptionHandlerMethodResolver resolver = this.exceptionHandlerCache.get(beanType); if (resolver == null) { @@ -241,7 +239,7 @@ public class AnnotationPubSubMessageHandler extends AbstractP return; } - invocableHandlerMethod = new InvocableMessageHandlerMethod(handlerMethod.getBean(), method); + invocableHandlerMethod = new InvocableMessageHandlerMethod(handlerMethod.getBean(), method); invocableHandlerMethod.setMessageMethodArgumentResolvers(this.argumentResolvers); try { diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ArgumentResolver.java b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ArgumentResolver.java index ac8ee3b97a..b54b3be830 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ArgumentResolver.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ArgumentResolver.java @@ -27,8 +27,7 @@ import org.springframework.messaging.Message; * @author Rossen Stoyanchev * @since 4.0 */ -@SuppressWarnings("rawtypes") -public interface ArgumentResolver { +public interface ArgumentResolver { /** * Whether the given {@linkplain MethodParameter method parameter} is @@ -54,6 +53,6 @@ public interface ArgumentResolver { * * @throws Exception in case of errors with the preparation of argument values */ - Object resolveArgument(MethodParameter parameter, M message) throws Exception; + Object resolveArgument(MethodParameter parameter, Message message) throws Exception; } diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ArgumentResolverComposite.java b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ArgumentResolverComposite.java index a3303b3e6d..f02ad6f191 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ArgumentResolverComposite.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ArgumentResolverComposite.java @@ -36,21 +36,20 @@ import org.springframework.util.Assert; * @author Rossen Stoyanchev * @since 4.0 */ -@SuppressWarnings("rawtypes") -public class ArgumentResolverComposite implements ArgumentResolver { +public class ArgumentResolverComposite implements ArgumentResolver { protected final Log logger = LogFactory.getLog(getClass()); - private final List> argumentResolvers = new LinkedList>(); + private final List argumentResolvers = new LinkedList(); - private final Map> argumentResolverCache = - new ConcurrentHashMap>(256); + private final Map argumentResolverCache = + new ConcurrentHashMap(256); /** * Return a read-only list with the contained resolvers, or an empty list. */ - public List> getResolvers() { + public List getResolvers() { return Collections.unmodifiableList(this.argumentResolvers); } @@ -68,9 +67,9 @@ public class ArgumentResolverComposite implements ArgumentRes * @exception IllegalStateException if no suitable {@link ArgumentResolver} is found. */ @Override - public Object resolveArgument(MethodParameter parameter, M message) throws Exception { + public Object resolveArgument(MethodParameter parameter, Message message) throws Exception { - ArgumentResolver resolver = getArgumentResolver(parameter); + ArgumentResolver resolver = getArgumentResolver(parameter); Assert.notNull(resolver, "Unknown parameter type [" + parameter.getParameterType().getName() + "]"); return resolver.resolveArgument(parameter, message); } @@ -78,10 +77,10 @@ public class ArgumentResolverComposite implements ArgumentRes /** * Find a registered {@link ArgumentResolver} that supports the given method parameter. */ - private ArgumentResolver getArgumentResolver(MethodParameter parameter) { - ArgumentResolver result = this.argumentResolverCache.get(parameter); + private ArgumentResolver getArgumentResolver(MethodParameter parameter) { + ArgumentResolver result = this.argumentResolverCache.get(parameter); if (result == null) { - for (ArgumentResolver resolver : this.argumentResolvers) { + for (ArgumentResolver resolver : this.argumentResolvers) { if (resolver.supportsParameter(parameter)) { result = resolver; this.argumentResolverCache.put(parameter, result); @@ -95,7 +94,7 @@ public class ArgumentResolverComposite implements ArgumentRes /** * Add the given {@link ArgumentResolver}. */ - public ArgumentResolverComposite addResolver(ArgumentResolver argumentResolver) { + public ArgumentResolverComposite addResolver(ArgumentResolver argumentResolver) { this.argumentResolvers.add(argumentResolver); return this; } @@ -103,9 +102,9 @@ public class ArgumentResolverComposite implements ArgumentRes /** * Add the given {@link ArgumentResolver}s. */ - public ArgumentResolverComposite addResolvers(List> argumentResolvers) { + public ArgumentResolverComposite addResolvers(List argumentResolvers) { if (argumentResolvers != null) { - for (ArgumentResolver resolver : argumentResolvers) { + for (ArgumentResolver resolver : argumentResolvers) { this.argumentResolvers.add(resolver); } } diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/InvocableMessageHandlerMethod.java b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/InvocableMessageHandlerMethod.java index 101bf9f2ac..252c440242 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/InvocableMessageHandlerMethod.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/InvocableMessageHandlerMethod.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2013 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,10 +44,9 @@ import org.springframework.web.method.HandlerMethod; * @author Rossen Stoyanchev * @since 4.0 */ -@SuppressWarnings("rawtypes") -public class InvocableMessageHandlerMethod extends HandlerMethod { +public class InvocableMessageHandlerMethod extends HandlerMethod { - private ArgumentResolverComposite argumentResolvers = new ArgumentResolverComposite(); + private ArgumentResolverComposite argumentResolvers = new ArgumentResolverComposite(); private ParameterNameDiscoverer parameterNameDiscoverer = new LocalVariableTableParameterNameDiscoverer(); @@ -85,7 +84,7 @@ public class InvocableMessageHandlerMethod extends HandlerMet * Set {@link ArgumentResolver}s to use to use for resolving method * argument values. */ - public void setMessageMethodArgumentResolvers(ArgumentResolverComposite argumentResolvers) { + public void setMessageMethodArgumentResolvers(ArgumentResolverComposite argumentResolvers) { this.argumentResolvers = argumentResolvers; } @@ -107,7 +106,7 @@ public class InvocableMessageHandlerMethod extends HandlerMet * @exception Exception raised if no suitable argument resolver can be found, or the * method raised an exception */ - public final Object invoke(M message, Object... providedArgs) throws Exception { + public final Object invoke(Message message, Object... providedArgs) throws Exception { Object[] args = getMethodArgumentValues(message, providedArgs); @@ -130,7 +129,7 @@ public class InvocableMessageHandlerMethod extends HandlerMet /** * Get the method argument values for the current request. */ - private Object[] getMethodArgumentValues(M message, Object... providedArgs) throws Exception { + private Object[] getMethodArgumentValues(Message message, Object... providedArgs) throws Exception { MethodParameter[] parameters = getMethodParameters(); Object[] args = new Object[parameters.length]; diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageBodyArgumentResolver.java b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageBodyArgumentResolver.java index 1a87b50312..a8cc3a03ae 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageBodyArgumentResolver.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageBodyArgumentResolver.java @@ -32,8 +32,7 @@ import org.springframework.web.messaging.support.PubSubHeaderAccesssor; * @author Rossen Stoyanchev * @since 4.0 */ -@SuppressWarnings("rawtypes") -public class MessageBodyArgumentResolver implements ArgumentResolver { +public class MessageBodyArgumentResolver implements ArgumentResolver { private final MessageConverter converter; @@ -48,7 +47,7 @@ public class MessageBodyArgumentResolver implements ArgumentR } @Override - public Object resolveArgument(MethodParameter parameter, M message) throws Exception { + public Object resolveArgument(MethodParameter parameter, Message message) throws Exception { Object arg = null; diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageChannelArgumentResolver.java b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageChannelArgumentResolver.java index b79b774da8..a13398b93a 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageChannelArgumentResolver.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageChannelArgumentResolver.java @@ -26,13 +26,12 @@ import org.springframework.util.Assert; * @author Rossen Stoyanchev * @since 4.0 */ -@SuppressWarnings("rawtypes") -public class MessageChannelArgumentResolver implements ArgumentResolver { +public class MessageChannelArgumentResolver implements ArgumentResolver { - private MessageChannel messageBrokerChannel; + private MessageChannel messageBrokerChannel; - public MessageChannelArgumentResolver(MessageChannel messageBrokerChannel) { + public MessageChannelArgumentResolver(MessageChannel messageBrokerChannel) { Assert.notNull(messageBrokerChannel, "messageBrokerChannel is required"); this.messageBrokerChannel = messageBrokerChannel; } @@ -43,7 +42,7 @@ public class MessageChannelArgumentResolver implements Argume } @Override - public Object resolveArgument(MethodParameter parameter, M message) throws Exception { + public Object resolveArgument(MethodParameter parameter, Message message) throws Exception { return this.messageBrokerChannel; } diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageReturnValueHandler.java b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageReturnValueHandler.java index caecd29b97..a9cbb09539 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageReturnValueHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageReturnValueHandler.java @@ -16,8 +16,6 @@ package org.springframework.web.messaging.service.method; -import java.util.Map; - import org.springframework.core.MethodParameter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -30,13 +28,12 @@ import org.springframework.web.messaging.support.PubSubHeaderAccesssor; * @author Rossen Stoyanchev * @since 4.0 */ -@SuppressWarnings("rawtypes") -public class MessageReturnValueHandler implements ReturnValueHandler { +public class MessageReturnValueHandler implements ReturnValueHandler { - private MessageChannel clientChannel; + private MessageChannel clientChannel; - public MessageReturnValueHandler(MessageChannel clientChannel) { + public MessageReturnValueHandler(MessageChannel clientChannel) { Assert.notNull(clientChannel, "clientChannel is required"); this.clientChannel = clientChannel; } @@ -59,28 +56,19 @@ public class MessageReturnValueHandler implements ReturnValue } @Override - public void handleReturnValue(Object returnValue, MethodParameter returnType, M message) + public void handleReturnValue(Object returnValue, MethodParameter returnType, Message message) throws Exception { Assert.notNull(this.clientChannel, "No clientChannel to send messages to"); - @SuppressWarnings("unchecked") - M returnMessage = (M) returnValue; - if (returnMessage == null) { + if (message == null) { return; } - returnMessage = processReturnMessage(returnMessage, message); - - this.clientChannel.send(returnMessage); - } - - protected M processReturnMessage(M returnMessage, M message) { - PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.wrap(message); Assert.notNull(headers.getSubscriptionId(), "No subscription id: " + message); - PubSubHeaderAccesssor returnHeaders = PubSubHeaderAccesssor.wrap(returnMessage); + PubSubHeaderAccesssor returnHeaders = PubSubHeaderAccesssor.wrap(message); returnHeaders.setSessionId(headers.getSessionId()); returnHeaders.setSubscriptionId(headers.getSubscriptionId()); @@ -88,12 +76,11 @@ public class MessageReturnValueHandler implements ReturnValue returnHeaders.setDestination(headers.getDestination()); } - return createMessage(returnMessage.getPayload(), returnHeaders.toHeaders()); - } + Message returnMessage = MessageBuilder.withPayload( + message.getPayload()).copyHeaders(headers.toHeaders()).build(); + + this.clientChannel.send(returnMessage); + } - @SuppressWarnings("unchecked") - private M createMessage(Object payload, Map headers) { - return (M) MessageBuilder.withPayload(payload).copyHeaders(headers).build(); - } } diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ReturnValueHandler.java b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ReturnValueHandler.java index e29761ac00..72f11b611d 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ReturnValueHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ReturnValueHandler.java @@ -27,8 +27,7 @@ import org.springframework.messaging.Message; * @author Rossen Stoyanchev * @since 4.0 */ -@SuppressWarnings("rawtypes") -public interface ReturnValueHandler { +public interface ReturnValueHandler { /** * Whether the given {@linkplain MethodParameter method return type} is @@ -51,6 +50,6 @@ public interface ReturnValueHandler { * @param message the message that caused this method to be called * @throws Exception if the return value handling results in an error */ - void handleReturnValue(Object returnValue, MethodParameter returnType, M message) throws Exception; + void handleReturnValue(Object returnValue, MethodParameter returnType, Message message) throws Exception; } diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ReturnValueHandlerComposite.java b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ReturnValueHandlerComposite.java index 109bce1299..17bc59f892 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ReturnValueHandlerComposite.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ReturnValueHandlerComposite.java @@ -28,16 +28,15 @@ import org.springframework.util.Assert; * @author Rossen Stoyanchev * @since 4.0 */ -@SuppressWarnings("rawtypes") -public class ReturnValueHandlerComposite implements ReturnValueHandler { +public class ReturnValueHandlerComposite implements ReturnValueHandler { - private final List> returnValueHandlers = new ArrayList>(); + private final List returnValueHandlers = new ArrayList(); /** * Add the given {@link ReturnValueHandler}. */ - public ReturnValueHandlerComposite addHandler(ReturnValueHandler returnValuehandler) { + public ReturnValueHandlerComposite addHandler(ReturnValueHandler returnValuehandler) { this.returnValueHandlers.add(returnValuehandler); return this; } @@ -45,9 +44,9 @@ public class ReturnValueHandlerComposite implements ReturnVal /** * Add the given {@link ReturnValueHandler}s. */ - public ReturnValueHandlerComposite addHandlers(List> handlers) { + public ReturnValueHandlerComposite addHandlers(List handlers) { if (handlers != null) { - for (ReturnValueHandler handler : handlers) { + for (ReturnValueHandler handler : handlers) { this.returnValueHandlers.add(handler); } } @@ -59,8 +58,8 @@ public class ReturnValueHandlerComposite implements ReturnVal return getReturnValueHandler(returnType) != null; } - private ReturnValueHandler getReturnValueHandler(MethodParameter returnType) { - for (ReturnValueHandler handler : this.returnValueHandlers) { + private ReturnValueHandler getReturnValueHandler(MethodParameter returnType) { + for (ReturnValueHandler handler : this.returnValueHandlers) { if (handler.supportsReturnType(returnType)) { return handler; } @@ -69,10 +68,10 @@ public class ReturnValueHandlerComposite implements ReturnVal } @Override - public void handleReturnValue(Object returnValue, MethodParameter returnType, M message) + public void handleReturnValue(Object returnValue, MethodParameter returnType, Message message) throws Exception { - ReturnValueHandler handler = getReturnValueHandler(returnType); + ReturnValueHandler handler = getReturnValueHandler(returnType); Assert.notNull(handler, "Unknown return value type [" + returnType.getParameterType().getName() + "]"); handler.handleReturnValue(returnValue, returnType, message); } diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompMessageConverter.java b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompMessageConverter.java index f212d6735f..1483dd6ae0 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompMessageConverter.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompMessageConverter.java @@ -35,8 +35,7 @@ import org.springframework.web.messaging.stomp.StompConversionException; * @author Rossen Stoyanchev * @since 4.0 */ -@SuppressWarnings("rawtypes") -public class StompMessageConverter { +public class StompMessageConverter { private static final Charset STOMP_CHARSET = Charset.forName("UTF-8"); @@ -49,7 +48,7 @@ public class StompMessageConverter { /** * @param stompContent a complete STOMP message (without the trailing 0x00) as byte[] or String. */ - public M toMessage(Object stompContent, String sessionId) { + public Message toMessage(Object stompContent, String sessionId) { byte[] byteContent = null; if (stompContent instanceof String) { @@ -100,12 +99,7 @@ public class StompMessageConverter { byte[] payload = new byte[totalLength - payloadIndex]; System.arraycopy(byteContent, payloadIndex, payload, 0, totalLength - payloadIndex); - return createMessage(stompHeaders, payload); - } - - @SuppressWarnings("unchecked") - private M createMessage(StompHeaderAccessor stompHeaders, byte[] payload) { - return (M) MessageBuilder.withPayload(payload).copyHeaders(stompHeaders.toHeaders()).build(); + return MessageBuilder.withPayload(payload).copyHeaders(stompHeaders.toHeaders()).build(); } private int findIndexOfPayload(byte[] bytes) { @@ -135,7 +129,7 @@ public class StompMessageConverter { return index; } - public byte[] fromMessage(M message) { + public byte[] fromMessage(Message message) { byte[] payload; if (message.getPayload() instanceof byte[]) { diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompRelayPubSubMessageHandler.java b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompRelayPubSubMessageHandler.java index 42814fe78a..f9e9b3f247 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompRelayPubSubMessageHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompRelayPubSubMessageHandler.java @@ -55,15 +55,14 @@ import reactor.tcp.netty.NettyTcpClient; * @author Rossen Stoyanchev * @since 4.0 */ -@SuppressWarnings("rawtypes") -public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler +public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler implements SmartLifecycle { private static final String STOMP_RELAY_SYSTEM_SESSION_ID = "stompRelaySystemSessionId"; - private MessageChannel clientChannel; + private MessageChannel clientChannel; - private final StompMessageConverter stompMessageConverter = new StompMessageConverter(); + private final StompMessageConverter stompMessageConverter = new StompMessageConverter(); private MessageConverter payloadConverter; @@ -80,7 +79,7 @@ public class StompRelayPubSubMessageHandler extends AbstractP * @param clientChannel a channel for sending messages from the remote message broker * back to clients */ - public StompRelayPubSubMessageHandler(PubSubChannelRegistry registry) { + public StompRelayPubSubMessageHandler(PubSubChannelRegistry registry) { Assert.notNull(registry, "registry is required"); this.clientChannel = registry.getClientOutputChannel(); this.payloadConverter = new CompositeMessageConverter(null); @@ -129,12 +128,12 @@ public class StompRelayPubSubMessageHandler extends AbstractP headers.setLogin("guest"); headers.setPasscode("guest"); headers.setHeartbeat(0, 0); - @SuppressWarnings("unchecked") - M message = (M) MessageBuilder.withPayload(new byte[0]).copyHeaders(headers.toStompMessageHeaders()).build(); + Message message = MessageBuilder.withPayload( + new byte[0]).copyHeaders(headers.toStompMessageHeaders()).build(); RelaySession session = new RelaySession(message, headers) { @Override - protected void sendMessageToClient(M message) { + protected void sendMessageToClient(Message message) { // TODO: check for ERROR frame (reconnect?) } }; @@ -166,7 +165,7 @@ public class StompRelayPubSubMessageHandler extends AbstractP } @Override - public void handleConnect(M message) { + public void handleConnect(Message message) { StompHeaderAccessor stompHeaders = StompHeaderAccessor.wrap(message); String sessionId = stompHeaders.getSessionId(); if (sessionId == null) { @@ -178,22 +177,22 @@ public class StompRelayPubSubMessageHandler extends AbstractP } @Override - public void handlePublish(M message) { + public void handlePublish(Message message) { forwardMessage(message, StompCommand.SEND); } @Override - public void handleSubscribe(M message) { + public void handleSubscribe(Message message) { forwardMessage(message, StompCommand.SUBSCRIBE); } @Override - public void handleUnsubscribe(M message) { + public void handleUnsubscribe(Message message) { forwardMessage(message, StompCommand.UNSUBSCRIBE); } @Override - public void handleDisconnect(M message) { + public void handleDisconnect(Message message) { StompHeaderAccessor stompHeaders = StompHeaderAccessor.wrap(message); if (stompHeaders.getStompCommand() != null) { forwardMessage(message, StompCommand.DISCONNECT); @@ -206,13 +205,13 @@ public class StompRelayPubSubMessageHandler extends AbstractP } @Override - public void handleOther(M message) { + public void handleOther(Message message) { StompCommand command = (StompCommand) message.getHeaders().get(PubSubHeaderAccesssor.PROTOCOL_MESSAGE_TYPE); Assert.notNull(command, "Expected STOMP command: " + message.getHeaders()); forwardMessage(message, command); } - private void forwardMessage(M message, StompCommand command) { + private void forwardMessage(Message message, StompCommand command) { StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); headers.setStompCommandIfNotSet(command); @@ -249,14 +248,14 @@ public class StompRelayPubSubMessageHandler extends AbstractP private final Promise> promise; - private final BlockingQueue messageQueue = new LinkedBlockingQueue(50); + private final BlockingQueue> messageQueue = new LinkedBlockingQueue>(50); private final Object monitor = new Object(); private volatile boolean isConnected = false; - public RelaySession(final M message, final StompHeaderAccessor stompHeaders) { + public RelaySession(final Message message, final StompHeaderAccessor stompHeaders) { Assert.notNull(message, "message is required"); Assert.notNull(stompHeaders, "stompHeaders is required"); @@ -297,7 +296,7 @@ public class StompRelayPubSubMessageHandler extends AbstractP return; } - M message = stompMessageConverter.toMessage(stompFrame, this.sessionId); + Message message = stompMessageConverter.toMessage(stompFrame, this.sessionId); if (logger.isTraceEnabled()) { logger.trace("Reading message " + message); } @@ -319,7 +318,7 @@ public class StompRelayPubSubMessageHandler extends AbstractP sendMessageToClient(message); } - protected void sendMessageToClient(M message) { + protected void sendMessageToClient(Message message) { clientChannel.send(message); } @@ -327,12 +326,11 @@ public class StompRelayPubSubMessageHandler extends AbstractP StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR); headers.setSessionId(sessionId); headers.setMessage(errorText); - @SuppressWarnings("unchecked") - M errorMessage = (M) MessageBuilder.withPayload(new byte[0]).copyHeaders(headers.toHeaders()).build(); + Message errorMessage = MessageBuilder.withPayload(new byte[0]).copyHeaders(headers.toHeaders()).build(); sendMessageToClient(errorMessage); } - public void forward(M message, StompHeaderAccessor headers) { + public void forward(Message message, StompHeaderAccessor headers) { if (!this.isConnected) { synchronized(this.monitor) { @@ -358,7 +356,7 @@ public class StompRelayPubSubMessageHandler extends AbstractP } private void flushMessages(TcpConnection connection) { - List messages = new ArrayList(); + List> messages = new ArrayList>(); this.messageQueue.drainTo(messages); for (Message message : messages) { StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); @@ -374,8 +372,7 @@ public class StompRelayPubSubMessageHandler extends AbstractP MediaType contentType = headers.getContentType(); byte[] payload = payloadConverter.convertToPayload(message.getPayload(), contentType); - @SuppressWarnings("unchecked") - M byteMessage = (M) MessageBuilder.withPayload(payload).copyHeaders(headers.toHeaders()).build(); + Message byteMessage = MessageBuilder.withPayload(payload).copyHeaders(headers.toHeaders()).build(); if (logger.isTraceEnabled()) { logger.trace("Forwarding message " + byteMessage); diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompWebSocketHandler.java b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompWebSocketHandler.java index 58b46c0e3e..828c4a879b 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompWebSocketHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompWebSocketHandler.java @@ -51,24 +51,22 @@ import reactor.util.Assert; * @author Rossen Stoyanchev * @since 4.0 */ -@SuppressWarnings("rawtypes") -public class StompWebSocketHandler extends TextWebSocketHandlerAdapter - implements MessageHandler { +public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implements MessageHandler { private static final byte[] EMPTY_PAYLOAD = new byte[0]; private static Log logger = LogFactory.getLog(StompWebSocketHandler.class); - private MessageChannel outputChannel; + private MessageChannel outputChannel; - private final StompMessageConverter stompMessageConverter = new StompMessageConverter(); + private final StompMessageConverter stompMessageConverter = new StompMessageConverter(); private final Map sessionInfos = new ConcurrentHashMap(); private MessageConverter payloadConverter = new CompositeMessageConverter(null); - public StompWebSocketHandler(PubSubChannelRegistry registry) { + public StompWebSocketHandler(PubSubChannelRegistry registry) { Assert.notNull(registry, "registry is required"); this.outputChannel = registry.getClientInputChannel(); } @@ -77,7 +75,7 @@ public class StompWebSocketHandler extends TextWebSocketHandl this.payloadConverter = new CompositeMessageConverter(converters); } - public StompMessageConverter getStompMessageConverter() { + public StompMessageConverter getStompMessageConverter() { return this.stompMessageConverter; } @@ -95,7 +93,7 @@ public class StompWebSocketHandler extends TextWebSocketHandl protected void handleTextMessage(WebSocketSession session, TextMessage textMessage) { try { String payload = textMessage.getPayload(); - M message = this.stompMessageConverter.toMessage(payload, session.getId()); + Message message = this.stompMessageConverter.toMessage(payload, session.getId()); // TODO: validate size limits // http://stomp.github.io/stomp-specification-1.2.html#Size_Limits @@ -138,7 +136,7 @@ public class StompWebSocketHandler extends TextWebSocketHandl } } - protected void handleConnect(final WebSocketSession session, M message) throws IOException { + protected void handleConnect(final WebSocketSession session, Message message) throws IOException { StompHeaderAccessor connectHeaders = StompHeaderAccessor.wrap(message); StompHeaderAccessor connectedHeaders = StompHeaderAccessor.create(StompCommand.CONNECTED); @@ -160,17 +158,16 @@ public class StompWebSocketHandler extends TextWebSocketHandl // TODO: security - @SuppressWarnings("unchecked") - M connectedMessage = (M) MessageBuilder.withPayload(EMPTY_PAYLOAD).copyHeaders( + Message connectedMessage = MessageBuilder.withPayload(EMPTY_PAYLOAD).copyHeaders( connectedHeaders.toHeaders()).build(); byte[] bytes = getStompMessageConverter().fromMessage(connectedMessage); session.sendMessage(new TextMessage(new String(bytes, Charset.forName("UTF-8")))); } - protected void handlePublish(M stompMessage) { + protected void handlePublish(Message stompMessage) { } - protected void handleSubscribe(M message) { + protected void handleSubscribe(Message message) { // TODO: need a way to communicate back if subscription was successfully created or // not in which case an ERROR should be sent back and close the connection @@ -184,13 +181,13 @@ public class StompWebSocketHandler extends TextWebSocketHandl sessionInfo.addSubscription(destination, headers.getSubscriptionId()); } - protected void handleUnsubscribe(M message) { + protected void handleUnsubscribe(Message message) { // TODO: remove subscription } - protected void handleDisconnect(M stompMessage) { + protected void handleDisconnect(Message stompMessage) { } protected void sendErrorMessage(WebSocketSession session, Throwable error) { @@ -198,8 +195,7 @@ public class StompWebSocketHandler extends TextWebSocketHandl StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR); headers.setMessage(error.getMessage()); - @SuppressWarnings("unchecked") - M message = (M) MessageBuilder.withPayload(EMPTY_PAYLOAD).copyHeaders(headers.toHeaders()).build(); + Message message = MessageBuilder.withPayload(EMPTY_PAYLOAD).copyHeaders(headers.toHeaders()).build(); byte[] bytes = this.stompMessageConverter.fromMessage(message); try { @@ -215,8 +211,7 @@ public class StompWebSocketHandler extends TextWebSocketHandl this.sessionInfos.remove(session.getId()); PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.create(MessageType.DISCONNECT); headers.setSessionId(session.getId()); - @SuppressWarnings("unchecked") - M message = (M) MessageBuilder.withPayload(new byte[0]).copyHeaders(headers.toHeaders()).build(); + Message message = MessageBuilder.withPayload(new byte[0]).copyHeaders(headers.toHeaders()).build(); this.outputChannel.send(message); } @@ -224,7 +219,7 @@ public class StompWebSocketHandler extends TextWebSocketHandl * Handle STOMP messages going back out to WebSocket clients. */ @Override - public void handleMessage(M message) { + public void handleMessage(Message message) { StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); headers.setStompCommandIfNotSet(StompCommand.MESSAGE); @@ -269,8 +264,7 @@ public class StompWebSocketHandler extends TextWebSocketHandl } try { - @SuppressWarnings("unchecked") - M byteMessage = (M) MessageBuilder.withPayload(payload).copyHeaders(headers.toHeaders()).build(); + Message byteMessage = MessageBuilder.withPayload(payload).copyHeaders(headers.toHeaders()).build(); byte[] bytes = getStompMessageConverter().fromMessage(byteMessage); session.sendMessage(new TextMessage(new String(bytes, Charset.forName("UTF-8")))); } diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/support/AbstractPubSubChannelRegistry.java b/spring-websocket/src/main/java/org/springframework/web/messaging/support/AbstractPubSubChannelRegistry.java index 7f07da8b46..b16f50c5da 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/support/AbstractPubSubChannelRegistry.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/support/AbstractPubSubChannelRegistry.java @@ -17,8 +17,6 @@ package org.springframework.web.messaging.support; import org.springframework.beans.factory.InitializingBean; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHandler; import org.springframework.messaging.SubscribableChannel; import org.springframework.util.Assert; import org.springframework.web.messaging.PubSubChannelRegistry; @@ -28,41 +26,39 @@ import org.springframework.web.messaging.PubSubChannelRegistry; * @author Rossen Stoyanchev * @since 4.0 */ -@SuppressWarnings("rawtypes") -public class AbstractPubSubChannelRegistry> - implements PubSubChannelRegistry, InitializingBean { +public class AbstractPubSubChannelRegistry implements PubSubChannelRegistry, InitializingBean { - private SubscribableChannel clientInputChannel; + private SubscribableChannel clientInputChannel; - private SubscribableChannel clientOutputChannel; + private SubscribableChannel clientOutputChannel; - private SubscribableChannel messageBrokerChannel; + private SubscribableChannel messageBrokerChannel; @Override - public SubscribableChannel getClientInputChannel() { + public SubscribableChannel getClientInputChannel() { return this.clientInputChannel; } - public void setClientInputChannel(SubscribableChannel channel) { + public void setClientInputChannel(SubscribableChannel channel) { this.clientInputChannel = channel; } @Override - public SubscribableChannel getClientOutputChannel() { + public SubscribableChannel getClientOutputChannel() { return this.clientOutputChannel; } - public void setClientOutputChannel(SubscribableChannel channel) { + public void setClientOutputChannel(SubscribableChannel channel) { this.clientOutputChannel = channel; } @Override - public SubscribableChannel getMessageBrokerChannel() { + public SubscribableChannel getMessageBrokerChannel() { return this.messageBrokerChannel; } - public void setMessageBrokerChannel(SubscribableChannel channel) { + public void setMessageBrokerChannel(SubscribableChannel channel) { this.messageBrokerChannel = channel; } diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/support/ReactorMessageChannel.java b/spring-websocket/src/main/java/org/springframework/web/messaging/support/ReactorMessageChannel.java index 318c34ce7b..a7b95400db 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/support/ReactorMessageChannel.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/support/ReactorMessageChannel.java @@ -36,7 +36,7 @@ import reactor.fn.selector.ObjectSelector; * @author Rossen Stoyanchev * @since 4.0 */ -public class ReactorMessageChannel implements SubscribableChannel, MessageHandler>> { +public class ReactorMessageChannel implements SubscribableChannel { private static Log logger = LogFactory.getLog(ReactorMessageChannel.class); @@ -47,8 +47,8 @@ public class ReactorMessageChannel implements SubscribableChannel, Me private String name = toString(); // TODO - private final Map>, Registration> registrations = - new HashMap>, Registration>(); + private final Map> registrations = + new HashMap>(); public ReactorMessageChannel(Reactor reactor) { @@ -78,7 +78,7 @@ public class ReactorMessageChannel implements SubscribableChannel, Me } @Override - public boolean subscribe(final MessageHandler> handler) { + public boolean subscribe(final MessageHandler handler) { if (this.registrations.containsKey(handler)) { logger.warn("Channel " + getName() + ", handler already subscribed " + handler); @@ -98,7 +98,7 @@ public class ReactorMessageChannel implements SubscribableChannel, Me } @Override - public boolean unsubscribe(MessageHandler> handler) { + public boolean unsubscribe(MessageHandler handler) { if (logger.isTraceEnabled()) { logger.trace("Channel " + getName() + ", removing subscription for handler " + handler); @@ -119,9 +119,9 @@ public class ReactorMessageChannel implements SubscribableChannel, Me private static final class MessageHandlerConsumer implements Consumer>> { - private final MessageHandler> handler; + private final MessageHandler handler; - private MessageHandlerConsumer(MessageHandler> handler) { + private MessageHandlerConsumer(MessageHandler handler) { this.handler = handler; } diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/support/ReactorPubSubChannelRegistry.java b/spring-websocket/src/main/java/org/springframework/web/messaging/support/ReactorPubSubChannelRegistry.java index 6eefc7a3a0..96e2281932 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/support/ReactorPubSubChannelRegistry.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/support/ReactorPubSubChannelRegistry.java @@ -16,8 +16,6 @@ package org.springframework.web.messaging.support; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHandler; import org.springframework.util.Assert; import reactor.core.Reactor; @@ -27,7 +25,7 @@ import reactor.core.Reactor; * @author Rossen Stoyanchev * @since 4.0 */ -public class ReactorPubSubChannelRegistry extends AbstractPubSubChannelRegistry, MessageHandler>> { +public class ReactorPubSubChannelRegistry extends AbstractPubSubChannelRegistry { public ReactorPubSubChannelRegistry(Reactor reactor) {