Introduce new HEARTBEAT message type

Previously, there was no generic concept of a message that represents
a heartbeat and the STOMP-specific code used a null STOMP command to
represent a heartbeat.

This commit introduces HEARTBEAT as a new SimpMessageType. The STOMP
support has been updated to create HEARTBEAT messages to represent
heartbeats, and to use the new message type as the mechanism by which
heartbeats are identified.
This commit is contained in:
Andy Wilkinson 2013-10-02 11:30:18 +01:00 committed by Rossen Stoyanchev
parent a7f735b50a
commit 41e411a8a5
5 changed files with 41 additions and 15 deletions

View File

@ -30,6 +30,8 @@ public enum SimpMessageType {
CONNECT_ACK,
HEARTBEAT,
MESSAGE,
SUBSCRIBE,

View File

@ -23,6 +23,7 @@ import java.nio.charset.Charset;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
@ -50,27 +51,29 @@ public class StompDecoder {
*/
public Message<byte[]> decode(ByteBuffer buffer) {
skipLeadingEol(buffer);
Message<byte[]> decodedMessage;
String command = readCommand(buffer);
if (command.length() > 0) {
MultiValueMap<String, String> headers = readHeaders(buffer);
byte[] payload = readPayload(buffer, headers);
Message<byte[]> decodedMessage = MessageBuilder.withPayloadAndHeaders(payload,
decodedMessage = MessageBuilder.withPayloadAndHeaders(payload,
StompHeaderAccessor.create(StompCommand.valueOf(command), headers)).build();
if (logger.isTraceEnabled()) {
logger.trace("Decoded " + decodedMessage);
}
return decodedMessage;
}
else {
if (logger.isTraceEnabled()) {
logger.trace("Decoded heartbeat");
}
return MessageBuilder.withPayload(HEARTBEAT_PAYLOAD).build();
decodedMessage = MessageBuilder.withPayloadAndHeaders(HEARTBEAT_PAYLOAD,
StompHeaderAccessor.create(SimpMessageType.HEARTBEAT)).build();
}
if (logger.isTraceEnabled()) {
logger.trace("Decoded " + decodedMessage);
}
return decodedMessage;
}
private String readCommand(ByteBuffer buffer) {

View File

@ -26,6 +26,7 @@ import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.SimpMessageType;
/**
* An encoder for STOMP frames
@ -78,7 +79,7 @@ public final class StompEncoder {
}
private boolean isHeartbeat(StompHeaderAccessor headers) {
return headers.getCommand() == null;
return headers.getMessageType() == SimpMessageType.HEARTBEAT;
}
private void writeCommand(StompHeaderAccessor headers, DataOutputStream output) throws IOException {

View File

@ -37,6 +37,7 @@ import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.simp.BrokerAvailabilityEvent;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.channel.ExecutorSubscribableChannel;
import org.springframework.util.Assert;
@ -301,7 +302,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
if (StompHeaderAccessor.wrap(message).getCommand() != null) {
if (StompHeaderAccessor.wrap(message).getMessageType() != SimpMessageType.HEARTBEAT) {
synchronized(this.monitor) {
for (MessageExchange exch : this.expected) {
if (exch.matchMessage(message)) {

View File

@ -22,18 +22,19 @@ import java.util.List;
import org.junit.Test;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.support.MessageBuilder;
import reactor.function.Consumer;
import reactor.function.Function;
import reactor.io.Buffer;
import static org.junit.Assert.*;
/**
* Test fixture for {@link StompCodec}.
*
* @author awilkinson
* @author Andy Wilkinson
*/
public class StompCodecTests {
@ -151,6 +152,24 @@ public class StompCodecTests {
assertEquals(StompCommand.DISCONNECT, StompHeaderAccessor.wrap(messages.get(1)).getCommand());
}
@Test
public void decodeHeartbeat() {
String frame = "\n";
Buffer buffer = Buffer.wrap(frame);
final List<Message<byte[]>> messages = new ArrayList<Message<byte[]>>();
new StompCodec().decoder(new Consumer<Message<byte[]>>() {
@Override
public void accept(Message<byte[]> message) {
messages.add(message);
}
}).apply(buffer);
assertEquals(1, messages.size());
assertEquals(SimpMessageType.HEARTBEAT, StompHeaderAccessor.wrap(messages.get(0)).getMessageType());
}
@Test
public void encodeFrameWithNoHeadersAndNoBody() {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);