Polishing contribution

Closes gh-31970
This commit is contained in:
rstoyanchev 2024-03-11 15:22:23 +00:00
parent 73ee86c666
commit f9883d8bd6
6 changed files with 105 additions and 204 deletions

View File

@ -105,12 +105,12 @@ it handle ERROR frames in addition to the `handleException` callback for
exceptions from the handling of messages and `handleTransportError` for
transport-level errors including `ConnectionLostException`.
You can also use `setInboundMessageSizeLimit(limit)` and `setOutboundMessageSizeLimit(limit)`
to limit the maximum size of inbound and outbound message size.
When outbound message size exceeds `outboundMessageSizeLimit`, message is split into multiple incomplete frames.
Then receiver buffers these incomplete frames and reassemble to complete message.
When inbound message size exceeds `inboundMessageSizeLimit`, throw `StompConversionException`.
The default value of in&outboundMessageSizeLimit is `64KB`.
You can use the `inboundMessageSizeLimit` and `outboundMessageSizeLimit` properties of
`WebSocketStompClient` to limit the maximum size of inbound and outbound WebSocket
messages. When an outbound STOMP message exceeds the limit, it is split into partial frames,
which the receiver would have to reassemble. By default, there is no size limit for outbound
messages. When an inbound STOMP message size exceeds the configured limit, a
`StompConversionException` is thrown. The default size limit for inbound messages is `64KB`.
[source,java,indent=0,subs="verbatim,quotes"]
----

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -29,12 +29,10 @@ import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* An extension of {@link org.springframework.messaging.simp.stomp.StompDecoder}
* that buffers content remaining in the input ByteBuffer after the parent
* class has read all (complete) STOMP frames from it. The remaining content
* represents an incomplete STOMP frame. When called repeatedly with additional
* data, the decode method returns one or more messages or, if there is not
* enough data still, continues to buffer.
* Uses {@link org.springframework.messaging.simp.stomp.StompDecoder} to decode
* a {@link ByteBuffer} to one or more STOMP message. If the message is incomplete,
* unused content is buffered and combined with the next input buffer, or if there
* is not enough data still, continues to buffer.
*
* <p>A single instance of this decoder can be invoked repeatedly to read all
* messages from a single stream (e.g. WebSocket session) as long as decoding

View File

@ -24,11 +24,12 @@ import java.util.Map;
import org.springframework.util.Assert;
/**
* An extension of {@link org.springframework.messaging.simp.stomp.StompEncoder}
* that splits the STOMP message to multiple incomplete STOMP frames
* when the encoded bytes length exceeds {@link SplittingStompEncoder#bufferSizeLimit}.
* Uses {@link org.springframework.messaging.simp.stomp.StompEncoder} to encode
* a message and splits it into parts no larger than the configured
* {@link SplittingStompEncoder#bufferSizeLimit}.
*
* @author Injae Kim
* @author Rossen Stoyanchev
* @since 6.2
* @see StompEncoder
*/
@ -38,6 +39,7 @@ public class SplittingStompEncoder {
private final int bufferSizeLimit;
public SplittingStompEncoder(StompEncoder encoder, int bufferSizeLimit) {
Assert.notNull(encoder, "StompEncoder is required");
Assert.isTrue(bufferSizeLimit > 0, "Buffer size limit must be greater than 0");
@ -45,11 +47,13 @@ public class SplittingStompEncoder {
this.bufferSizeLimit = bufferSizeLimit;
}
/**
* Encodes the given payload and headers into a list of one or more {@code byte[]}s.
* @param headers the headers
* @param payload the payload
* @return the list of one or more encoded messages
* Encode the given payload and headers to a STOMP frame, and split into a
* list of parts based on the configured buffer size limit.
* @param headers the STOMP message headers
* @param payload the STOMP message payload
* @return the parts of the encoded STOMP message
*/
public List<byte[]> encode(Map<String, Object> headers, byte[] payload) {
byte[] result = this.encoder.encode(headers, payload);
@ -65,4 +69,5 @@ public class SplittingStompEncoder {
}
return frames;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -83,8 +83,8 @@ public class StompEncoder {
/**
* Encodes the given payload and headers into a {@code byte[]}.
* @param headers the headers
* @param payload the payload
* @param headers the STOMP message headers
* @param payload the STOMP message payload
* @return the encoded message
*/
public byte[] encode(Map<String, Object> headers, byte[] payload) {

View File

@ -17,13 +17,12 @@
package org.springframework.messaging.simp.stomp;
import java.io.ByteArrayOutputStream;
import java.util.Arrays;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.lang.Nullable;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@ -32,351 +31,250 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
* Unit tests for {@link SplittingStompEncoder}.
*
* @author Injae Kim
* @since 6.2
* @author Rossen Stoyanchev
*/
public class SplittingStompEncoderTests {
private final StompEncoder STOMP_ENCODER = new StompEncoder();
private static final StompEncoder ENCODER = new StompEncoder();
public static final byte[] EMPTY_PAYLOAD = new byte[0];
private static final int DEFAULT_MESSAGE_MAX_SIZE = 64 * 1024;
@Test
public void encodeFrameWithNoHeadersAndNoBody() {
SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, DEFAULT_MESSAGE_MAX_SIZE);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
List<byte[]> actual = splittingEncoder(null).encode(headers.getMessageHeaders(), EMPTY_PAYLOAD);
List<byte[]> actual = encoder.encode(frame.getHeaders(), frame.getPayload());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
actual.forEach(outputStream::writeBytes);
assertThat(outputStream.toString()).isEqualTo("DISCONNECT\n\n\0");
assertThat(toAggregatedString(actual)).isEqualTo("DISCONNECT\n\n\0");
assertThat(actual.size()).isOne();
}
@Test
public void encodeFrameWithNoHeadersAndNoBodySplitTwoFrames() {
SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 7);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
List<byte[]> actual = splittingEncoder(7).encode(headers.getMessageHeaders(), EMPTY_PAYLOAD);
List<byte[]> actual = encoder.encode(frame.getHeaders(), frame.getPayload());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
actual.forEach(outputStream::writeBytes);
assertThat(outputStream.toString()).isEqualTo("DISCONNECT\n\n\0");
assertThat(toAggregatedString(actual)).isEqualTo("DISCONNECT\n\n\0");
assertThat(actual.size()).isEqualTo(2);
assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 7));
assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 7, outputStream.size()));
}
@Test
public void encodeFrameWithNoHeadersAndNoBodySplitMultipleFrames() {
SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 3);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
List<byte[]> actual = splittingEncoder(3).encode(headers.getMessageHeaders(), EMPTY_PAYLOAD);
List<byte[]> actual = encoder.encode(frame.getHeaders(), frame.getPayload());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
actual.forEach(outputStream::writeBytes);
assertThat(outputStream.toString()).isEqualTo("DISCONNECT\n\n\0");
assertThat(toAggregatedString(actual)).isEqualTo("DISCONNECT\n\n\0");
assertThat(actual.size()).isEqualTo(5);
assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 3));
assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 3, 6));
assertThat(actual.get(2)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 6, 9));
assertThat(actual.get(3)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 9, 12));
assertThat(actual.get(4)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 12, outputStream.size()));
}
@Test
public void encodeFrameWithHeaders() {
SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, DEFAULT_MESSAGE_MAX_SIZE);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setAcceptVersion("1.2");
headers.setHost("github.org");
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
List<byte[]> actual = encoder.encode(frame.getHeaders(), frame.getPayload());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
actual.forEach(outputStream::writeBytes);
String actualString = outputStream.toString();
List<byte[]> actual = splittingEncoder(null).encode(headers.getMessageHeaders(), EMPTY_PAYLOAD);
String output = toAggregatedString(actual);
assertThat("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0".equals(actualString) ||
"CONNECT\nhost:github.org\naccept-version:1.2\n\n\0".equals(actualString)).isTrue();
List<String> list = List.of(
"CONNECT\naccept-version:1.2\nhost:github.org\n\n\0",
"CONNECT\nhost:github.org\naccept-version:1.2\n\n\0");
assertThat(list).contains(output);
assertThat(actual.size()).isOne();
}
@Test
public void encodeFrameWithHeadersSplitTwoFrames() {
SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 30);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setAcceptVersion("1.2");
headers.setHost("github.org");
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
List<byte[]> actual = encoder.encode(frame.getHeaders(), frame.getPayload());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
actual.forEach(outputStream::writeBytes);
String actualString = outputStream.toString();
List<byte[]> actual = splittingEncoder(30).encode(headers.getMessageHeaders(), EMPTY_PAYLOAD);
String output = toAggregatedString(actual);
assertThat("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0".equals(actualString) ||
"CONNECT\nhost:github.org\naccept-version:1.2\n\n\0".equals(actualString)).isTrue();
assertThat("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0".equals(output) ||
"CONNECT\nhost:github.org\naccept-version:1.2\n\n\0".equals(output)).isTrue();
assertThat(actual.size()).isEqualTo(2);
assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 30));
assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 30, outputStream.size()));
}
@Test
public void encodeFrameWithHeadersSplitMultipleFrames() {
SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 10);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setAcceptVersion("1.2");
headers.setHost("github.org");
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
List<byte[]> actual = encoder.encode(frame.getHeaders(), frame.getPayload());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
actual.forEach(outputStream::writeBytes);
String actualString = outputStream.toString();
List<byte[]> actual = splittingEncoder(10).encode(headers.getMessageHeaders(), EMPTY_PAYLOAD);
String output = toAggregatedString(actual);
assertThat("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0".equals(actualString) ||
"CONNECT\nhost:github.org\naccept-version:1.2\n\n\0".equals(actualString)).isTrue();
List<String> list = List.of(
"CONNECT\naccept-version:1.2\nhost:github.org\n\n\0",
"CONNECT\nhost:github.org\naccept-version:1.2\n\n\0");
assertThat(list).contains(output);
assertThat(actual.size()).isEqualTo(5);
assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 10));
assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 10, 20));
assertThat(actual.get(2)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 20, 30));
assertThat(actual.get(3)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 30, 40));
assertThat(actual.get(4)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 40, outputStream.size()));
}
@Test
public void encodeFrameWithHeadersThatShouldBeEscaped() {
SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, DEFAULT_MESSAGE_MAX_SIZE);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
headers.addNativeHeader("a:\r\n\\b", "alpha:bravo\r\n\\");
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
List<byte[]> actual = encoder.encode(frame.getHeaders(), frame.getPayload());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
actual.forEach(outputStream::writeBytes);
List<byte[]> actual = splittingEncoder(null).encode(headers.getMessageHeaders(), EMPTY_PAYLOAD);
String output = toAggregatedString(actual);
assertThat(outputStream.toString()).isEqualTo("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0");
assertThat(output).isEqualTo("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0");
assertThat(actual.size()).isOne();
}
@Test
public void encodeFrameWithHeadersThatShouldBeEscapedSplitTwoFrames() {
SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 30);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
headers.addNativeHeader("a:\r\n\\b", "alpha:bravo\r\n\\");
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
List<byte[]> actual = encoder.encode(frame.getHeaders(), frame.getPayload());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
actual.forEach(outputStream::writeBytes);
List<byte[]> actual = splittingEncoder(30).encode(headers.getMessageHeaders(), EMPTY_PAYLOAD);
String output = toAggregatedString(actual);
assertThat(outputStream.toString()).isEqualTo("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0");
assertThat(output).isEqualTo("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0");
assertThat(actual.size()).isEqualTo(2);
assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 30));
assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 30, outputStream.size()));
}
@Test
public void encodeFrameWithHeadersThatShouldBeEscapedSplitMultipleFrames() {
SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 10);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
headers.addNativeHeader("a:\r\n\\b", "alpha:bravo\r\n\\");
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
List<byte[]> actual = encoder.encode(frame.getHeaders(), frame.getPayload());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
actual.forEach(outputStream::writeBytes);
String actualString = outputStream.toString();
List<byte[]> actual = splittingEncoder(10).encode(headers.getMessageHeaders(), EMPTY_PAYLOAD);
String output = toAggregatedString(actual);
assertThat(outputStream.toString()).isEqualTo("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0");
assertThat(output).isEqualTo("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0");
assertThat(actual.size()).isEqualTo(5);
assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 10));
assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 10, 20));
assertThat(actual.get(2)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 20, 30));
assertThat(actual.get(3)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 30, 40));
assertThat(actual.get(4)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 40, outputStream.size()));
}
@Test
public void encodeFrameWithHeadersBody() {
SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, DEFAULT_MESSAGE_MAX_SIZE);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.addNativeHeader("a", "alpha");
Message<byte[]> frame = MessageBuilder.createMessage(
"Message body".getBytes(), headers.getMessageHeaders());
List<byte[]> actual = encoder.encode(frame.getHeaders(), frame.getPayload());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
actual.forEach(outputStream::writeBytes);
List<byte[]> actual = splittingEncoder(null).encode(headers.getMessageHeaders(), "Message body".getBytes());
String output = toAggregatedString(actual);
assertThat(outputStream.toString()).isEqualTo("SEND\na:alpha\ncontent-length:12\n\nMessage body\0");
assertThat(output).isEqualTo("SEND\na:alpha\ncontent-length:12\n\nMessage body\0");
assertThat(actual.size()).isOne();
}
@Test
public void encodeFrameWithHeadersBodySplitTwoFrames() {
SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 30);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.addNativeHeader("a", "alpha");
Message<byte[]> frame = MessageBuilder.createMessage(
"Message body".getBytes(), headers.getMessageHeaders());
List<byte[]> actual = encoder.encode(frame.getHeaders(), frame.getPayload());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
actual.forEach(outputStream::writeBytes);
List<byte[]> actual = splittingEncoder(30).encode(headers.getMessageHeaders(), "Message body".getBytes());
String output = toAggregatedString(actual);
assertThat(outputStream.toString()).isEqualTo("SEND\na:alpha\ncontent-length:12\n\nMessage body\0");
assertThat(output).isEqualTo("SEND\na:alpha\ncontent-length:12\n\nMessage body\0");
assertThat(actual.size()).isEqualTo(2);
assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 30));
assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 30, outputStream.size()));
}
@Test
public void encodeFrameWithHeadersBodySplitMultipleFrames() {
SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 10);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.addNativeHeader("a", "alpha");
Message<byte[]> frame = MessageBuilder.createMessage(
"Message body".getBytes(), headers.getMessageHeaders());
List<byte[]> actual = encoder.encode(frame.getHeaders(), frame.getPayload());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
actual.forEach(outputStream::writeBytes);
List<byte[]> actual = splittingEncoder(10).encode(headers.getMessageHeaders(), "Message body".getBytes());
String output = toAggregatedString(actual);
assertThat(outputStream.toString()).isEqualTo("SEND\na:alpha\ncontent-length:12\n\nMessage body\0");
assertThat(output).isEqualTo("SEND\na:alpha\ncontent-length:12\n\nMessage body\0");
assertThat(actual.size()).isEqualTo(5);
assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 10));
assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 10, 20));
assertThat(actual.get(2)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 20, 30));
assertThat(actual.get(3)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 30, 40));
assertThat(actual.get(4)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 40, outputStream.size()));
}
@Test
public void encodeFrameWithContentLengthPresent() {
SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, DEFAULT_MESSAGE_MAX_SIZE);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.setContentLength(12);
Message<byte[]> frame = MessageBuilder.createMessage(
"Message body".getBytes(), headers.getMessageHeaders());
List<byte[]> actual = encoder.encode(frame.getHeaders(), frame.getPayload());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
actual.forEach(outputStream::writeBytes);
List<byte[]> actual = splittingEncoder(null).encode(headers.getMessageHeaders(), "Message body".getBytes());
String output = toAggregatedString(actual);
assertThat(outputStream.toString()).isEqualTo("SEND\ncontent-length:12\n\nMessage body\0");
assertThat(output).isEqualTo("SEND\ncontent-length:12\n\nMessage body\0");
assertThat(actual.size()).isOne();
}
@Test
public void encodeFrameWithContentLengthPresentSplitTwoFrames() {
SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 20);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.setContentLength(12);
Message<byte[]> frame = MessageBuilder.createMessage(
"Message body".getBytes(), headers.getMessageHeaders());
List<byte[]> actual = encoder.encode(frame.getHeaders(), frame.getPayload());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
actual.forEach(outputStream::writeBytes);
List<byte[]> actual = splittingEncoder(20).encode(headers.getMessageHeaders(), "Message body".getBytes());
String output = toAggregatedString(actual);
assertThat(outputStream.toString()).isEqualTo("SEND\ncontent-length:12\n\nMessage body\0");
assertThat(output).isEqualTo("SEND\ncontent-length:12\n\nMessage body\0");
assertThat(actual.size()).isEqualTo(2);
assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 20));
assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 20, outputStream.size()));
}
@Test
public void encodeFrameWithContentLengthPresentSplitMultipleFrames() {
SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 10);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.setContentLength(12);
Message<byte[]> frame = MessageBuilder.createMessage(
"Message body".getBytes(), headers.getMessageHeaders());
List<byte[]> actual = encoder.encode(frame.getHeaders(), frame.getPayload());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
actual.forEach(outputStream::writeBytes);
List<byte[]> actual = splittingEncoder(10).encode(headers.getMessageHeaders(), "Message body".getBytes());
String output = toAggregatedString(actual);
assertThat(outputStream.toString()).isEqualTo("SEND\ncontent-length:12\n\nMessage body\0");
assertThat(output).isEqualTo("SEND\ncontent-length:12\n\nMessage body\0");
assertThat(actual.size()).isEqualTo(4);
assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 10));
assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 10, 20));
assertThat(actual.get(2)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 20, 30));
assertThat(actual.get(3)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 30, outputStream.size()));
}
@Test
public void sameLengthAndBufferSizeLimit() {
SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 44);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.addNativeHeader("a", "1234");
Message<byte[]> frame = MessageBuilder.createMessage(
"Message body".getBytes(), headers.getMessageHeaders());
List<byte[]> actual = encoder.encode(frame.getHeaders(), frame.getPayload());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
actual.forEach(outputStream::writeBytes);
List<byte[]> actual = splittingEncoder(44).encode(headers.getMessageHeaders(), "Message body".getBytes());
String output = toAggregatedString(actual);
assertThat(outputStream.toString()).isEqualTo("SEND\na:1234\ncontent-length:12\n\nMessage body\0");
assertThat(output).isEqualTo("SEND\na:1234\ncontent-length:12\n\nMessage body\0");
assertThat(actual.size()).isOne();
assertThat(outputStream.toByteArray().length).isEqualTo(44);
}
@Test
public void lengthAndBufferSizeLimitExactlySplitTwoFrames() {
SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 22);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.addNativeHeader("a", "1234");
Message<byte[]> frame = MessageBuilder.createMessage(
"Message body".getBytes(), headers.getMessageHeaders());
List<byte[]> actual = encoder.encode(frame.getHeaders(), frame.getPayload());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
actual.forEach(outputStream::writeBytes);
List<byte[]> actual = splittingEncoder(22).encode(headers.getMessageHeaders(), "Message body".getBytes());
String output = toAggregatedString(actual);
assertThat(outputStream.toString()).isEqualTo("SEND\na:1234\ncontent-length:12\n\nMessage body\0");
assertThat(output).isEqualTo("SEND\na:1234\ncontent-length:12\n\nMessage body\0");
assertThat(actual.size()).isEqualTo(2);
assertThat(outputStream.toByteArray().length).isEqualTo(44);
assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 22));
assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 22, 44));
}
@Test
public void lengthAndBufferSizeLimitExactlySplitMultipleFrames() {
SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 11);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.addNativeHeader("a", "1234");
Message<byte[]> frame = MessageBuilder.createMessage(
"Message body".getBytes(), headers.getMessageHeaders());
List<byte[]> actual = encoder.encode(frame.getHeaders(), frame.getPayload());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
actual.forEach(outputStream::writeBytes);
List<byte[]> actual = splittingEncoder(11).encode(headers.getMessageHeaders(), "Message body".getBytes());
String output = toAggregatedString(actual);
assertThat(outputStream.toString()).isEqualTo("SEND\na:1234\ncontent-length:12\n\nMessage body\0");
assertThat(output).isEqualTo("SEND\na:1234\ncontent-length:12\n\nMessage body\0");
assertThat(actual.size()).isEqualTo(4);
assertThat(outputStream.toByteArray().length).isEqualTo(44);
assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 11));
assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 11, 22));
assertThat(actual.get(2)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 22, 33));
assertThat(actual.get(3)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 33, 44));
}
@Test
public void bufferSizeLimitShouldBePositive() {
assertThatThrownBy(() -> new SplittingStompEncoder(STOMP_ENCODER, 0))
.isInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(() -> new SplittingStompEncoder(STOMP_ENCODER, -1))
.isInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(() -> splittingEncoder(0)).isInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(() -> splittingEncoder(-1)).isInstanceOf(IllegalArgumentException.class);
}
private static SplittingStompEncoder splittingEncoder(@Nullable Integer bufferSizeLimit) {
return new SplittingStompEncoder(ENCODER, (bufferSizeLimit != null ? bufferSizeLimit : 64 * 1024));
}
private static String toAggregatedString(List<byte[]> actual) {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
actual.forEach(outputStream::writeBytes);
return outputStream.toString(StandardCharsets.UTF_8);
}
}

View File

@ -124,7 +124,7 @@ class WebSocketStompClientIntegrationTests {
@Test
void publishSubscribeWithSlitMessage() throws Exception {
StringBuilder sb = new StringBuilder();
while (sb.length() < 1024) {
while (sb.length() < 2000) {
sb.append("A message with a long body... ");
}
String payload = sb.toString();