Polish StompDecoder and the new Buffering sub-class
Issue: SPR-11527
This commit is contained in:
parent
bbdb72d808
commit
545c4effb1
|
@ -23,7 +23,6 @@ import org.springframework.util.LinkedMultiValueMap;
|
||||||
import org.springframework.util.MultiValueMap;
|
import org.springframework.util.MultiValueMap;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
|
@ -32,13 +31,17 @@ import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An extension of {@link org.springframework.messaging.simp.stomp.StompDecoder}
|
* An extension of {@link org.springframework.messaging.simp.stomp.StompDecoder}
|
||||||
* that chunks any bytes remaining after a single full STOMP frame has been read.
|
* that buffers content remaining in the input ByteBuffer after the parent
|
||||||
* The remaining bytes may contain more STOMP frames or an incomplete STOMP frame.
|
* class has read all (complete) STOMP frames from it. The remaining content
|
||||||
|
* represents an incomplete STOMP frame. When called repeatedly with additional
|
||||||
|
* data, the decode method returns one or more messages or, if there is not
|
||||||
|
* enough data still, continues to buffer.
|
||||||
*
|
*
|
||||||
* <p>Similarly if there is not enough content for a full STOMP frame, the content
|
* <p>A single instance of this decoder can be invoked repeatedly to read all
|
||||||
* is buffered until more input is received. That means the
|
* messages from a single stream (e.g. WebSocket session) as long as decoding
|
||||||
* {@link #decode(java.nio.ByteBuffer)} effectively never returns {@code null} as
|
* does not fail. If there is an exception, StompDecoder instance should not
|
||||||
* the parent class does.
|
* be used any more as its internal state is not guaranteed to be consistent.
|
||||||
|
* It is expected that the underlying session is closed at that point.
|
||||||
*
|
*
|
||||||
* @author Rossen Stoyanchev
|
* @author Rossen Stoyanchev
|
||||||
* @since 4.0.3
|
* @since 4.0.3
|
||||||
|
@ -58,10 +61,16 @@ public class BufferingStompDecoder extends StompDecoder {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the configured buffer size limit.
|
||||||
|
*/
|
||||||
public int getBufferSizeLimit() {
|
public int getBufferSizeLimit() {
|
||||||
return this.bufferSizeLimit;
|
return this.bufferSizeLimit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculate the current buffer size.
|
||||||
|
*/
|
||||||
public int getBufferSize() {
|
public int getBufferSize() {
|
||||||
int size = 0;
|
int size = 0;
|
||||||
for (ByteBuffer buffer : this.chunks) {
|
for (ByteBuffer buffer : this.chunks) {
|
||||||
|
@ -70,15 +79,36 @@ public class BufferingStompDecoder extends StompDecoder {
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the expected content length of the currently buffered, incomplete STOMP frame.
|
||||||
|
*/
|
||||||
public Integer getExpectedContentLength() {
|
public Integer getExpectedContentLength() {
|
||||||
return this.expectedContentLength;
|
return this.expectedContentLength;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decodes one or more STOMP frames from the given {@code ByteBuffer} into a
|
||||||
|
* list of {@link Message}s.
|
||||||
|
*
|
||||||
|
* <p>If there was enough data to parse a "content-length" header, then the
|
||||||
|
* value is used to determine how much more data is needed before a new
|
||||||
|
* attempt to decode is made.
|
||||||
|
*
|
||||||
|
* <p>If there was not enough data to parse the "content-length", or if there
|
||||||
|
* is "content-length" header, every subsequent call to decode attempts to
|
||||||
|
* parse again with all available data. Therefore the presence of a "content-length"
|
||||||
|
* header helps to optimize the decoding of large messages.
|
||||||
|
*
|
||||||
|
* @param newBuffer a buffer containing new data to decode
|
||||||
|
*
|
||||||
|
* @return decoded messages or an empty list
|
||||||
|
* @throws StompConversionException raised in case of decoding issues
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public List<Message<byte[]>> decode(ByteBuffer newData) {
|
public List<Message<byte[]>> decode(ByteBuffer newBuffer) {
|
||||||
|
|
||||||
this.chunks.add(newData);
|
this.chunks.add(newBuffer);
|
||||||
|
|
||||||
checkBufferLimits();
|
checkBufferLimits();
|
||||||
|
|
||||||
|
@ -86,13 +116,13 @@ public class BufferingStompDecoder extends StompDecoder {
|
||||||
return Collections.<Message<byte[]>>emptyList();
|
return Collections.<Message<byte[]>>emptyList();
|
||||||
}
|
}
|
||||||
|
|
||||||
ByteBuffer buffer = assembleChunksAndReset();
|
ByteBuffer bufferToDecode = assembleChunksAndReset();
|
||||||
|
|
||||||
MultiValueMap<String, String> headers = new LinkedMultiValueMap<String, String>();
|
MultiValueMap<String, String> headers = new LinkedMultiValueMap<String, String>();
|
||||||
List<Message<byte[]>> messages = decode(buffer, headers);
|
List<Message<byte[]>> messages = decode(bufferToDecode, headers);
|
||||||
|
|
||||||
if (buffer.hasRemaining()) {
|
if (bufferToDecode.hasRemaining()) {
|
||||||
this.chunks.add(buffer);
|
this.chunks.add(bufferToDecode);
|
||||||
this.expectedContentLength = getContentLength(headers);
|
this.expectedContentLength = getContentLength(headers);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,14 +28,18 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.springframework.messaging.Message;
|
import org.springframework.messaging.Message;
|
||||||
import org.springframework.messaging.simp.SimpMessageType;
|
import org.springframework.messaging.simp.SimpMessageType;
|
||||||
import org.springframework.messaging.support.MessageBuilder;
|
import org.springframework.messaging.support.MessageBuilder;
|
||||||
|
import org.springframework.util.Assert;
|
||||||
import org.springframework.util.LinkedMultiValueMap;
|
import org.springframework.util.LinkedMultiValueMap;
|
||||||
import org.springframework.util.MultiValueMap;
|
import org.springframework.util.MultiValueMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Decodes one or more STOMP frames from a {@link ByteBuffer}. If the buffer
|
* Decodes one or more STOMP frames contained in a {@link ByteBuffer}.
|
||||||
* contains any additional (incomplete) data, or perhaps not enough data to
|
*
|
||||||
* form even one Message, the the buffer is reset and the value returned is
|
* <p>An attempt is made to read all complete STOMP frames from the buffer, which
|
||||||
* an empty list indicating that no more message can be read.
|
* could be zero, one, or more. If there is any left-over content, i.e. an incomplete
|
||||||
|
* STOMP frame, at the end the buffer is reset to point to the beginning of the
|
||||||
|
* partial content. The caller is then responsible for dealing with that
|
||||||
|
* incomplete content by buffering until there is more input available.
|
||||||
*
|
*
|
||||||
* @author Andy Wilkinson
|
* @author Andy Wilkinson
|
||||||
* @author Rossen Stoyanchev
|
* @author Rossen Stoyanchev
|
||||||
|
@ -52,10 +56,8 @@ public class StompDecoder {
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Decodes one or more STOMP frames from the given {@code buffer} into a
|
* Decodes one or more STOMP frames from the given {@code ByteBuffer} into a
|
||||||
* list of {@link Message}s.
|
* list of {@link Message}s. If the input buffer contains any incplcontains partial STOMP frame content, or additional
|
||||||
*
|
|
||||||
* <p>If the given ByteBuffer contains partial STOMP frame content, or additional
|
|
||||||
* content with a partial STOMP frame, the buffer is reset and {@code null} is
|
* content with a partial STOMP frame, the buffer is reset and {@code null} is
|
||||||
* returned.
|
* returned.
|
||||||
*
|
*
|
||||||
|
@ -68,27 +70,37 @@ public class StompDecoder {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Decodes one or more STOMP frames from the given {@code buffer} into a
|
* Decodes one or more STOMP frames from the given {@code buffer} and returns
|
||||||
* list of {@link Message}s.
|
* a list of {@link Message}s.
|
||||||
*
|
*
|
||||||
* <p>If the given ByteBuffer contains partial STOMP frame content, or additional
|
* <p>If the given ByteBuffer contains only partial STOMP frame content and no
|
||||||
* content with a partial STOMP frame, the buffer is reset and {@code null} is
|
* complete STOMP frames, an empty list is returned, and the buffer is reset to
|
||||||
* returned.
|
* to where it was.
|
||||||
|
*
|
||||||
|
* <p>If the buffer contains one ore more STOMP frames, those are returned and
|
||||||
|
* the buffer reset to point to the beginning of the unused partial content.
|
||||||
|
*
|
||||||
|
* <p>The input headers map is used to store successfully parsed headers and
|
||||||
|
* is cleared after ever successfully read message. So when partial content is
|
||||||
|
* read the caller can check if a "content-length" header was read, which helps
|
||||||
|
* to determine how much more content is needed before the next STOMP frame
|
||||||
|
* can be decoded.
|
||||||
*
|
*
|
||||||
* @param buffer The buffer to decode the STOMP frame from
|
* @param buffer The buffer to decode the STOMP frame from
|
||||||
* @param headers an empty map that will be filled with the successfully parsed
|
* @param headers an empty map that will contain successfully parsed headers
|
||||||
* headers of the last decoded message, or the last attempt at decoding an
|
* in cases where the partial buffer ended with a partial STOMP frame
|
||||||
* (incomplete) STOMP frame. This can be useful for detecting 'content-length'.
|
|
||||||
*
|
*
|
||||||
* @return the decoded messages or an empty list
|
* @return decoded messages or an empty list
|
||||||
|
* @throws StompConversionException raised in case of decoding issues
|
||||||
*/
|
*/
|
||||||
public List<Message<byte[]>> decode(ByteBuffer buffer, MultiValueMap<String, String> headers) {
|
public List<Message<byte[]>> decode(ByteBuffer buffer, MultiValueMap<String, String> headers) {
|
||||||
|
Assert.notNull(headers, "headers is required");
|
||||||
List<Message<byte[]>> messages = new ArrayList<Message<byte[]>>();
|
List<Message<byte[]>> messages = new ArrayList<Message<byte[]>>();
|
||||||
while (buffer.hasRemaining()) {
|
while (buffer.hasRemaining()) {
|
||||||
headers.clear();
|
|
||||||
Message<byte[]> m = decodeMessage(buffer, headers);
|
Message<byte[]> m = decodeMessage(buffer, headers);
|
||||||
if (m != null) {
|
if (m != null) {
|
||||||
messages.add(m);
|
messages.add(m);
|
||||||
|
headers.clear();
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -79,7 +79,16 @@ public class StompSubProtocolHandler implements SubProtocolHandler {
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the message buffer size limit in bytes.
|
* Configure the maximum size of the buffer used when a STOMP message has been
|
||||||
|
* split over multiple WebSocket messages.
|
||||||
|
*
|
||||||
|
* <p>While the STOMP spec version 1.2 (current as of 4.0.3) does not discuss
|
||||||
|
* STOMP over WebSocket explicitly, a number of clients already split messages
|
||||||
|
* around 16K boundaries. Therefore partial content must be buffered before a
|
||||||
|
* full message can be assembled.
|
||||||
|
*
|
||||||
|
* <p>By default this property is set to 64K.
|
||||||
|
*
|
||||||
* @since 4.0.3
|
* @since 4.0.3
|
||||||
*/
|
*/
|
||||||
public void setMessageBufferSizeLimit(int messageBufferSizeLimit) {
|
public void setMessageBufferSizeLimit(int messageBufferSizeLimit) {
|
||||||
|
@ -87,7 +96,8 @@ public class StompSubProtocolHandler implements SubProtocolHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the message buffer size limit in bytes.
|
* Get the configured message buffer size limit in bytes.
|
||||||
|
*
|
||||||
* @since 4.0.3
|
* @since 4.0.3
|
||||||
*/
|
*/
|
||||||
public int getMessageBufferSizeLimit() {
|
public int getMessageBufferSizeLimit() {
|
||||||
|
|
Loading…
Reference in New Issue