Update STOMP decoder to handle incomplete frames
Previously, StompDecoder would throw a StompConversionException when it attempted to decode a Buffer that contained an incomplete frame. This commit updates StompDecoder to return null when it encounters an incomplete frame. It also resets the buffer, thereby allowing the decode to be reattempted once more data has been received. StompCodec's decoder function has been updated to stop attempting to decode a Buffer when StompDecoder returns null. Issue: SPR-11088
This commit is contained in:
parent
9600bf07c7
commit
e84885c655
|
@ -53,6 +53,8 @@ public class StompCodec implements Codec<Buffer, Message<byte[]>, Message<byte[]
|
||||||
Message<byte[]> message = DECODER.decode(buffer.byteBuffer());
|
Message<byte[]> message = DECODER.decode(buffer.byteBuffer());
|
||||||
if (message != null) {
|
if (message != null) {
|
||||||
next.accept(message);
|
next.accept(message);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -52,7 +52,9 @@ public class StompDecoder {
|
||||||
public Message<byte[]> decode(ByteBuffer buffer) {
|
public Message<byte[]> decode(ByteBuffer buffer) {
|
||||||
skipLeadingEol(buffer);
|
skipLeadingEol(buffer);
|
||||||
|
|
||||||
Message<byte[]> decodedMessage;
|
Message<byte[]> decodedMessage = null;
|
||||||
|
|
||||||
|
buffer.mark();
|
||||||
|
|
||||||
String command = readCommand(buffer);
|
String command = readCommand(buffer);
|
||||||
|
|
||||||
|
@ -60,18 +62,25 @@ public class StompDecoder {
|
||||||
MultiValueMap<String, String> headers = readHeaders(buffer);
|
MultiValueMap<String, String> headers = readHeaders(buffer);
|
||||||
byte[] payload = readPayload(buffer, headers);
|
byte[] payload = readPayload(buffer, headers);
|
||||||
|
|
||||||
StompCommand stompCommand = StompCommand.valueOf(command);
|
if (payload != null) {
|
||||||
if ((payload.length > 0) && (!stompCommand.isBodyAllowed())) {
|
StompCommand stompCommand = StompCommand.valueOf(command);
|
||||||
throw new StompConversionException(stompCommand +
|
if ((payload.length > 0) && (!stompCommand.isBodyAllowed())) {
|
||||||
" isn't allowed to have a body but has payload length=" + payload.length +
|
throw new StompConversionException(stompCommand +
|
||||||
", headers=" + headers);
|
" isn't allowed to have a body but has payload length=" + payload.length +
|
||||||
}
|
", headers=" + headers);
|
||||||
|
}
|
||||||
|
|
||||||
decodedMessage = MessageBuilder.withPayload(payload)
|
decodedMessage = MessageBuilder.withPayload(payload)
|
||||||
.setHeaders(StompHeaderAccessor.create(stompCommand, headers)).build();
|
.setHeaders(StompHeaderAccessor.create(stompCommand, headers)).build();
|
||||||
|
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Decoded " + decodedMessage);
|
logger.debug("Decoded " + decodedMessage);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("Received incomplete frame. Resetting buffer");
|
||||||
|
}
|
||||||
|
buffer.reset();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
@ -105,8 +114,10 @@ public class StompDecoder {
|
||||||
String header = new String(headerStream.toByteArray(), UTF8_CHARSET);
|
String header = new String(headerStream.toByteArray(), UTF8_CHARSET);
|
||||||
int colonIndex = header.indexOf(':');
|
int colonIndex = header.indexOf(':');
|
||||||
if ((colonIndex <= 0) || (colonIndex == header.length() - 1)) {
|
if ((colonIndex <= 0) || (colonIndex == header.length() - 1)) {
|
||||||
throw new StompConversionException(
|
if (buffer.remaining() > 0) {
|
||||||
"Illegal header: '" + header + "'. A header must be of the form <name>:<value");
|
throw new StompConversionException(
|
||||||
|
"Illegal header: '" + header + "'. A header must be of the form <name>:<value>");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
String headerName = unescape(header.substring(0, colonIndex));
|
String headerName = unescape(header.substring(0, colonIndex));
|
||||||
|
@ -133,10 +144,15 @@ public class StompDecoder {
|
||||||
if (contentLengthString != null) {
|
if (contentLengthString != null) {
|
||||||
int contentLength = Integer.valueOf(contentLengthString);
|
int contentLength = Integer.valueOf(contentLengthString);
|
||||||
byte[] payload = new byte[contentLength];
|
byte[] payload = new byte[contentLength];
|
||||||
buffer.get(payload);
|
if (buffer.remaining() > contentLength) {
|
||||||
if (buffer.remaining() < 1 || buffer.get() != 0) {
|
buffer.get(payload);
|
||||||
throw new StompConversionException("Frame must be terminated with a null octect");
|
if (buffer.get() != 0) {
|
||||||
|
throw new StompConversionException("Frame must be terminated with a null octet");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
return payload;
|
return payload;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
@ -151,7 +167,7 @@ public class StompDecoder {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
throw new StompConversionException("Frame must be terminated with a null octect");
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void skipLeadingEol(ByteBuffer buffer) {
|
private void skipLeadingEol(ByteBuffer buffer) {
|
||||||
|
|
|
@ -158,6 +158,30 @@ public class StompCodecTests {
|
||||||
assertEquals(StompCommand.DISCONNECT, StompHeaderAccessor.wrap(messages.get(1)).getCommand());
|
assertEquals(StompCommand.DISCONNECT, StompHeaderAccessor.wrap(messages.get(1)).getCommand());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void decodeFrameWithIncompleteHeader() {
|
||||||
|
assertIncompleteDecode("SEND\ndestination");
|
||||||
|
assertIncompleteDecode("SEND\ndestination:");
|
||||||
|
assertIncompleteDecode("SEND\ndestination:test");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void decodeFrameWithoutNullOctetTerminator() {
|
||||||
|
assertIncompleteDecode("SEND\ndestination:test\n");
|
||||||
|
assertIncompleteDecode("SEND\ndestination:test\n\n");
|
||||||
|
assertIncompleteDecode("SEND\ndestination:test\n\nThe body");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void decodeFrameWithInsufficientContent() {
|
||||||
|
assertIncompleteDecode("SEND\ncontent-length:23\n\nThe body of the mess");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected=StompConversionException.class)
|
||||||
|
public void decodeFrameWithIncorrectTerminator() {
|
||||||
|
decode("SEND\ncontent-length:23\n\nThe body of the message*");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void decodeHeartbeat() {
|
public void decodeHeartbeat() {
|
||||||
String frame = "\n";
|
String frame = "\n";
|
||||||
|
@ -219,11 +243,28 @@ public class StompCodecTests {
|
||||||
assertEquals("SEND\na:alpha\ncontent-length:12\n\nMessage body\0", new StompCodec().encoder().apply(frame).asString());
|
assertEquals("SEND\na:alpha\ncontent-length:12\n\nMessage body\0", new StompCodec().encoder().apply(frame).asString());
|
||||||
}
|
}
|
||||||
|
|
||||||
private Message<byte[]> decode(String stompFrame) {
|
private void assertIncompleteDecode(String partialFrame) {
|
||||||
this.decoder.apply(Buffer.wrap(stompFrame));
|
Buffer buffer = Buffer.wrap(partialFrame);
|
||||||
return consumer.arguments.get(0);
|
assertNull(decode(buffer));
|
||||||
|
assertEquals(0, buffer.position());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Message<byte[]> decode(String stompFrame) {
|
||||||
|
Buffer buffer = Buffer.wrap(stompFrame);
|
||||||
|
return decode(buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Message<byte[]> decode(Buffer buffer) {
|
||||||
|
this.decoder.apply(buffer);
|
||||||
|
if (consumer.arguments.isEmpty()) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
return consumer.arguments.get(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private static final class ArgumentCapturingConsumer<T> implements Consumer<T> {
|
private static final class ArgumentCapturingConsumer<T> implements Consumer<T> {
|
||||||
|
|
||||||
private final List<T> arguments = new ArrayList<T>();
|
private final List<T> arguments = new ArrayList<T>();
|
||||||
|
|
Loading…
Reference in New Issue