diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/PubSubHeaders.java b/spring-websocket/src/main/java/org/springframework/web/messaging/PubSubHeaders.java index 2aebcabc3ea..7ed9546ba3f 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/PubSubHeaders.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/PubSubHeaders.java @@ -112,6 +112,7 @@ public class PubSubHeaders { */ @SuppressWarnings("unchecked") protected PubSubHeaders(MessageHeaders originalHeaders) { + Assert.notNull(originalHeaders, "originalHeaders is required"); this.originalHeaders = originalHeaders; this.externalSourceHeaders = (originalHeaders.get(EXTERNAL_SOURCE_HEADERS) != null) ? (Map>) originalHeaders.get(EXTERNAL_SOURCE_HEADERS) : emptyMultiValueMap; diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/StompHeaders.java b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/StompHeaders.java index 20926a9b4e0..e8c9289172f 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/StompHeaders.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/StompHeaders.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import org.springframework.http.MediaType; import org.springframework.messaging.Message; @@ -47,7 +48,7 @@ import reactor.util.Assert; */ public class StompHeaders extends PubSubHeaders { - private static final String ID = "id"; + private static final String STOMP_ID = "id"; private static final String HOST = "host"; @@ -78,6 +79,8 @@ public class StompHeaders extends PubSubHeaders { private static final String STOMP_HEADERS = "stompHeaders"; + private static final AtomicLong messageIdCounter = new AtomicLong(); + private final Map headers; @@ -105,8 +108,8 @@ public class StompHeaders extends PubSubHeaders { super.setContentType(MediaType.parseMediaType(contentType)); } if (StompCommand.SUBSCRIBE.equals(getStompCommand())) { - if (getHeaderValue(ID) != null) { - super.setSubscriptionId(getHeaderValue(ID)); + if (getHeaderValue(STOMP_ID) != null) { + super.setSubscriptionId(getHeaderValue(STOMP_ID)); } } } @@ -192,7 +195,7 @@ public class StompHeaders extends PubSubHeaders { logger.warn("STOMP MESSAGE frame should have a subscription: " + this.toString()); } if ((getMessageId() == null)) { - this.headers.put(MESSAGE_ID, toMessageHeaders().get(ID).toString()); + result.set(MESSAGE_ID, getSessionId() + "-" + messageIdCounter.getAndIncrement()); } } diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompWebSocketHandler.java b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompWebSocketHandler.java index 0c54d956edc..7886ab9cb52 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompWebSocketHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompWebSocketHandler.java @@ -211,17 +211,20 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter { String sessionId = stompHeaders.getSessionId(); if (sessionId == null) { - logger.error("Cannot send message without a session id: " + message); + logger.error("No \"sessionId\" header in message: " + message); } WebSocketSession session = getWebSocketSession(sessionId); + if (session == null) { + logger.error("Session not found: " + message); + } byte[] payload; try { MediaType contentType = stompHeaders.getContentType(); payload = payloadConverter.convertToPayload(message.getPayload(), contentType); } - catch (Exception e) { - logger.error("Failed to send " + message, e); + catch (Throwable t) { + logger.error("Failed to send " + message, t); return; } diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/support/DestinationMessage.java b/spring-websocket/src/main/java/org/springframework/web/messaging/support/DestinationMessage.java deleted file mode 100644 index 8daa9b37ec9..00000000000 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/support/DestinationMessage.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.web.messaging.support; - -import java.util.Map; - -import org.springframework.messaging.GenericMessage; - - -/** - * - * @author Rossen Stoyanchev - * @since 4.0 - */ -public class DestinationMessage extends GenericMessage { - - - public DestinationMessage(T payload, Map headers) { - super(payload, headers); - } - - public DestinationMessage(T payload) { - super(payload); - } - - - -}