Polish STOMP codec

Issue: SPR-11088
This commit is contained in:
Rossen Stoyanchev 2013-11-13 13:44:07 -05:00
parent e84885c655
commit 6802f813de
2 changed files with 30 additions and 27 deletions

View File

@ -53,7 +53,8 @@ public class StompCodec implements Codec<Buffer, Message<byte[]>, Message<byte[]
Message<byte[]> message = DECODER.decode(buffer.byteBuffer()); Message<byte[]> message = DECODER.decode(buffer.byteBuffer());
if (message != null) { if (message != null) {
next.accept(message); next.accept(message);
} else { }
else {
break; break;
} }
} }

View File

@ -29,9 +29,12 @@ import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap; import org.springframework.util.MultiValueMap;
/** /**
* A decoder for STOMP frames. * Decodes STOMP frames from a {@link ByteBuffer}. If the buffer does not contain
* enough data to form a complete STOMP frame, the buffer is reset and the value
* returned is {@code null} indicating that no message could be read.
* *
* @author Andy Wilkinson * @author Andy Wilkinson
* @author Rossen Stoyanchev
* @since 4.0 * @since 4.0
*/ */
public class StompDecoder { public class StompDecoder {
@ -45,15 +48,18 @@ public class StompDecoder {
/** /**
* Decodes a STOMP frame in the given {@code buffer} into a {@link Message}. * Decodes a STOMP frame in the given {@code buffer} into a {@link Message}.
* If the given ByteBuffer contains partial STOMP frame content, the method
* resets the buffer and returns {@code null}.
* *
* @param buffer The buffer to decode the frame from * @param buffer The buffer to decode the frame from
* @return The decoded message *
* @return The decoded message or {@code null}
*/ */
public Message<byte[]> decode(ByteBuffer buffer) { public Message<byte[]> decode(ByteBuffer buffer) {
skipLeadingEol(buffer);
Message<byte[]> decodedMessage = null; Message<byte[]> decodedMessage = null;
skipLeadingEol(buffer);
buffer.mark(); buffer.mark();
String command = readCommand(buffer); String command = readCommand(buffer);
@ -65,34 +71,38 @@ public class StompDecoder {
if (payload != null) { if (payload != null) {
StompCommand stompCommand = StompCommand.valueOf(command); StompCommand stompCommand = StompCommand.valueOf(command);
if ((payload.length > 0) && (!stompCommand.isBodyAllowed())) { if ((payload.length > 0) && (!stompCommand.isBodyAllowed())) {
throw new StompConversionException(stompCommand + throw new StompConversionException(stompCommand + " shouldn't have but " +
" isn't allowed to have a body but has payload length=" + payload.length + "has a payload with length=" + payload.length + ", headers=" + headers);
", headers=" + headers);
} }
decodedMessage = MessageBuilder.withPayload(payload) decodedMessage = MessageBuilder.withPayload(payload)
.setHeaders(StompHeaderAccessor.create(stompCommand, headers)).build(); .setHeaders(StompHeaderAccessor.create(stompCommand, headers)).build();
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Decoded " + decodedMessage); logger.debug("Decoded " + decodedMessage);
} }
} else { }
if (logger.isDebugEnabled()) { else {
logger.debug("Received incomplete frame. Resetting buffer"); if (logger.isTraceEnabled()) {
logger.trace("Received incomplete frame. Resetting buffer");
} }
buffer.reset(); buffer.reset();
} }
} }
else { else {
decodedMessage = MessageBuilder.withPayload(HEARTBEAT_PAYLOAD).setHeaders(
StompHeaderAccessor.create(SimpMessageType.HEARTBEAT)).build();
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Decoded heartbeat"); logger.trace("Decoded heartbeat");
} }
decodedMessage = MessageBuilder.withPayload(HEARTBEAT_PAYLOAD).setHeaders(
StompHeaderAccessor.create(SimpMessageType.HEARTBEAT)).build();
} }
return decodedMessage; return decodedMessage;
}
private void skipLeadingEol(ByteBuffer buffer) {
while (true) {
if (!isEol(buffer)) {
break;
}
}
} }
private String readCommand(ByteBuffer buffer) { private String readCommand(ByteBuffer buffer) {
@ -143,17 +153,17 @@ public class StompDecoder {
String contentLengthString = headers.getFirst("content-length"); String contentLengthString = headers.getFirst("content-length");
if (contentLengthString != null) { if (contentLengthString != null) {
int contentLength = Integer.valueOf(contentLengthString); int contentLength = Integer.valueOf(contentLengthString);
byte[] payload = new byte[contentLength];
if (buffer.remaining() > contentLength) { if (buffer.remaining() > contentLength) {
byte[] payload = new byte[contentLength];
buffer.get(payload); buffer.get(payload);
if (buffer.get() != 0) { if (buffer.get() != 0) {
throw new StompConversionException("Frame must be terminated with a null octet"); throw new StompConversionException("Frame must be terminated with a null octet");
} }
} else { return payload;
}
else {
return null; return null;
} }
return payload;
} }
else { else {
ByteArrayOutputStream payload = new ByteArrayOutputStream(); ByteArrayOutputStream payload = new ByteArrayOutputStream();
@ -170,14 +180,6 @@ public class StompDecoder {
return null; return null;
} }
private void skipLeadingEol(ByteBuffer buffer) {
while (true) {
if (!isEol(buffer)) {
break;
}
}
}
private boolean isEol(ByteBuffer buffer) { private boolean isEol(ByteBuffer buffer) {
if (buffer.remaining() > 0) { if (buffer.remaining() > 0) {
byte b = buffer.get(); byte b = buffer.get();