Fix setting of message-id STOMP header

This commit is contained in:
Rossen Stoyanchev 2013-06-12 13:39:35 -04:00
parent 82dfd781d0
commit c420f37137
4 changed files with 14 additions and 49 deletions

View File

@ -112,6 +112,7 @@ public class PubSubHeaders {
*/
@SuppressWarnings("unchecked")
protected PubSubHeaders(MessageHeaders originalHeaders) {
Assert.notNull(originalHeaders, "originalHeaders is required");
this.originalHeaders = originalHeaders;
this.externalSourceHeaders = (originalHeaders.get(EXTERNAL_SOURCE_HEADERS) != null) ?
(Map<String, List<String>>) originalHeaders.get(EXTERNAL_SOURCE_HEADERS) : emptyMultiValueMap;

View File

@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.http.MediaType;
import org.springframework.messaging.Message;
@ -47,7 +48,7 @@ import reactor.util.Assert;
*/
public class StompHeaders extends PubSubHeaders {
private static final String ID = "id";
private static final String STOMP_ID = "id";
private static final String HOST = "host";
@ -78,6 +79,8 @@ public class StompHeaders extends PubSubHeaders {
private static final String STOMP_HEADERS = "stompHeaders";
private static final AtomicLong messageIdCounter = new AtomicLong();
private final Map<String, String> headers;
@ -105,8 +108,8 @@ public class StompHeaders extends PubSubHeaders {
super.setContentType(MediaType.parseMediaType(contentType));
}
if (StompCommand.SUBSCRIBE.equals(getStompCommand())) {
if (getHeaderValue(ID) != null) {
super.setSubscriptionId(getHeaderValue(ID));
if (getHeaderValue(STOMP_ID) != null) {
super.setSubscriptionId(getHeaderValue(STOMP_ID));
}
}
}
@ -192,7 +195,7 @@ public class StompHeaders extends PubSubHeaders {
logger.warn("STOMP MESSAGE frame should have a subscription: " + this.toString());
}
if ((getMessageId() == null)) {
this.headers.put(MESSAGE_ID, toMessageHeaders().get(ID).toString());
result.set(MESSAGE_ID, getSessionId() + "-" + messageIdCounter.getAndIncrement());
}
}

View File

@ -211,17 +211,20 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter {
String sessionId = stompHeaders.getSessionId();
if (sessionId == null) {
logger.error("Cannot send message without a session id: " + message);
logger.error("No \"sessionId\" header in message: " + message);
}
WebSocketSession session = getWebSocketSession(sessionId);
if (session == null) {
logger.error("Session not found: " + message);
}
byte[] payload;
try {
MediaType contentType = stompHeaders.getContentType();
payload = payloadConverter.convertToPayload(message.getPayload(), contentType);
}
catch (Exception e) {
logger.error("Failed to send " + message, e);
catch (Throwable t) {
logger.error("Failed to send " + message, t);
return;
}

View File

@ -1,42 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.support;
import java.util.Map;
import org.springframework.messaging.GenericMessage;
/**
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class DestinationMessage<T> extends GenericMessage<T> {
public DestinationMessage(T payload, Map<String, Object> headers) {
super(payload, headers);
}
public DestinationMessage(T payload) {
super(payload);
}
}