diff --git a/framework-docs/modules/ROOT/pages/web/websocket/stomp/client.adoc b/framework-docs/modules/ROOT/pages/web/websocket/stomp/client.adoc index 5b908999cf..ba205223a8 100644 --- a/framework-docs/modules/ROOT/pages/web/websocket/stomp/client.adoc +++ b/framework-docs/modules/ROOT/pages/web/websocket/stomp/client.adoc @@ -105,12 +105,12 @@ it handle ERROR frames in addition to the `handleException` callback for exceptions from the handling of messages and `handleTransportError` for transport-level errors including `ConnectionLostException`. -You can also use `setInboundMessageSizeLimit(limit)` and `setOutboundMessageSizeLimit(limit)` -to limit the maximum size of inbound and outbound message size. -When outbound message size exceeds `outboundMessageSizeLimit`, message is split into multiple incomplete frames. -Then receiver buffers these incomplete frames and reassemble to complete message. -When inbound message size exceeds `inboundMessageSizeLimit`, throw `StompConversionException`. -The default value of in&outboundMessageSizeLimit is `64KB`. +You can use the `inboundMessageSizeLimit` and `outboundMessageSizeLimit` properties of +`WebSocketStompClient` to limit the maximum size of inbound and outbound WebSocket +messages. When an outbound STOMP message exceeds the limit, it is split into partial frames, +which the receiver would have to reassemble. By default, there is no size limit for outbound +messages. When an inbound STOMP message size exceeds the configured limit, a +`StompConversionException` is thrown. The default size limit for inbound messages is `64KB`. [source,java,indent=0,subs="verbatim,quotes"] ---- diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BufferingStompDecoder.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BufferingStompDecoder.java index 9f37280ab7..eccfc807bf 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BufferingStompDecoder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BufferingStompDecoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,12 +29,10 @@ import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; /** - * An extension of {@link org.springframework.messaging.simp.stomp.StompDecoder} - * that buffers content remaining in the input ByteBuffer after the parent - * class has read all (complete) STOMP frames from it. The remaining content - * represents an incomplete STOMP frame. When called repeatedly with additional - * data, the decode method returns one or more messages or, if there is not - * enough data still, continues to buffer. + * Uses {@link org.springframework.messaging.simp.stomp.StompDecoder} to decode + * a {@link ByteBuffer} to one or more STOMP message. If the message is incomplete, + * unused content is buffered and combined with the next input buffer, or if there + * is not enough data still, continues to buffer. * *

A single instance of this decoder can be invoked repeatedly to read all * messages from a single stream (e.g. WebSocket session) as long as decoding diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/SplittingStompEncoder.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/SplittingStompEncoder.java index eec6e54dfe..a72b7ff0f1 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/SplittingStompEncoder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/SplittingStompEncoder.java @@ -24,11 +24,12 @@ import java.util.Map; import org.springframework.util.Assert; /** - * An extension of {@link org.springframework.messaging.simp.stomp.StompEncoder} - * that splits the STOMP message to multiple incomplete STOMP frames - * when the encoded bytes length exceeds {@link SplittingStompEncoder#bufferSizeLimit}. + * Uses {@link org.springframework.messaging.simp.stomp.StompEncoder} to encode + * a message and splits it into parts no larger than the configured + * {@link SplittingStompEncoder#bufferSizeLimit}. * * @author Injae Kim + * @author Rossen Stoyanchev * @since 6.2 * @see StompEncoder */ @@ -38,6 +39,7 @@ public class SplittingStompEncoder { private final int bufferSizeLimit; + public SplittingStompEncoder(StompEncoder encoder, int bufferSizeLimit) { Assert.notNull(encoder, "StompEncoder is required"); Assert.isTrue(bufferSizeLimit > 0, "Buffer size limit must be greater than 0"); @@ -45,11 +47,13 @@ public class SplittingStompEncoder { this.bufferSizeLimit = bufferSizeLimit; } + /** - * Encodes the given payload and headers into a list of one or more {@code byte[]}s. - * @param headers the headers - * @param payload the payload - * @return the list of one or more encoded messages + * Encode the given payload and headers to a STOMP frame, and split into a + * list of parts based on the configured buffer size limit. + * @param headers the STOMP message headers + * @param payload the STOMP message payload + * @return the parts of the encoded STOMP message */ public List encode(Map headers, byte[] payload) { byte[] result = this.encoder.encode(headers, payload); @@ -65,4 +69,5 @@ public class SplittingStompEncoder { } return frames; } + } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompEncoder.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompEncoder.java index f2537ca55e..8d9da03c42 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompEncoder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompEncoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -83,8 +83,8 @@ public class StompEncoder { /** * Encodes the given payload and headers into a {@code byte[]}. - * @param headers the headers - * @param payload the payload + * @param headers the STOMP message headers + * @param payload the STOMP message payload * @return the encoded message */ public byte[] encode(Map headers, byte[] payload) { diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/SplittingStompEncoderTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/SplittingStompEncoderTests.java index 8b37d0ea29..a5d2640671 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/SplittingStompEncoderTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/SplittingStompEncoderTests.java @@ -17,13 +17,12 @@ package org.springframework.messaging.simp.stomp; import java.io.ByteArrayOutputStream; -import java.util.Arrays; +import java.nio.charset.StandardCharsets; import java.util.List; import org.junit.jupiter.api.Test; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.MessageBuilder; +import org.springframework.lang.Nullable; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -32,351 +31,250 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; * Unit tests for {@link SplittingStompEncoder}. * * @author Injae Kim - * @since 6.2 + * @author Rossen Stoyanchev */ public class SplittingStompEncoderTests { - private final StompEncoder STOMP_ENCODER = new StompEncoder(); + private static final StompEncoder ENCODER = new StompEncoder(); + + public static final byte[] EMPTY_PAYLOAD = new byte[0]; - private static final int DEFAULT_MESSAGE_MAX_SIZE = 64 * 1024; @Test public void encodeFrameWithNoHeadersAndNoBody() { - SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, DEFAULT_MESSAGE_MAX_SIZE); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); - Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); + List actual = splittingEncoder(null).encode(headers.getMessageHeaders(), EMPTY_PAYLOAD); - List actual = encoder.encode(frame.getHeaders(), frame.getPayload()); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - actual.forEach(outputStream::writeBytes); - - assertThat(outputStream.toString()).isEqualTo("DISCONNECT\n\n\0"); + assertThat(toAggregatedString(actual)).isEqualTo("DISCONNECT\n\n\0"); assertThat(actual.size()).isOne(); } @Test public void encodeFrameWithNoHeadersAndNoBodySplitTwoFrames() { - SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 7); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); - Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); + List actual = splittingEncoder(7).encode(headers.getMessageHeaders(), EMPTY_PAYLOAD); - List actual = encoder.encode(frame.getHeaders(), frame.getPayload()); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - actual.forEach(outputStream::writeBytes); - - assertThat(outputStream.toString()).isEqualTo("DISCONNECT\n\n\0"); + assertThat(toAggregatedString(actual)).isEqualTo("DISCONNECT\n\n\0"); assertThat(actual.size()).isEqualTo(2); - assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 7)); - assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 7, outputStream.size())); } @Test public void encodeFrameWithNoHeadersAndNoBodySplitMultipleFrames() { - SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 3); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); - Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); + List actual = splittingEncoder(3).encode(headers.getMessageHeaders(), EMPTY_PAYLOAD); - List actual = encoder.encode(frame.getHeaders(), frame.getPayload()); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - actual.forEach(outputStream::writeBytes); - - assertThat(outputStream.toString()).isEqualTo("DISCONNECT\n\n\0"); + assertThat(toAggregatedString(actual)).isEqualTo("DISCONNECT\n\n\0"); assertThat(actual.size()).isEqualTo(5); - assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 3)); - assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 3, 6)); - assertThat(actual.get(2)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 6, 9)); - assertThat(actual.get(3)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 9, 12)); - assertThat(actual.get(4)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 12, outputStream.size())); } @Test public void encodeFrameWithHeaders() { - SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, DEFAULT_MESSAGE_MAX_SIZE); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); headers.setAcceptVersion("1.2"); headers.setHost("github.org"); - Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); - List actual = encoder.encode(frame.getHeaders(), frame.getPayload()); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - actual.forEach(outputStream::writeBytes); - String actualString = outputStream.toString(); + List actual = splittingEncoder(null).encode(headers.getMessageHeaders(), EMPTY_PAYLOAD); + String output = toAggregatedString(actual); - assertThat("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0".equals(actualString) || - "CONNECT\nhost:github.org\naccept-version:1.2\n\n\0".equals(actualString)).isTrue(); + List list = List.of( + "CONNECT\naccept-version:1.2\nhost:github.org\n\n\0", + "CONNECT\nhost:github.org\naccept-version:1.2\n\n\0"); + + assertThat(list).contains(output); assertThat(actual.size()).isOne(); } @Test public void encodeFrameWithHeadersSplitTwoFrames() { - SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 30); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); headers.setAcceptVersion("1.2"); headers.setHost("github.org"); - Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); - List actual = encoder.encode(frame.getHeaders(), frame.getPayload()); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - actual.forEach(outputStream::writeBytes); - String actualString = outputStream.toString(); + List actual = splittingEncoder(30).encode(headers.getMessageHeaders(), EMPTY_PAYLOAD); + String output = toAggregatedString(actual); - assertThat("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0".equals(actualString) || - "CONNECT\nhost:github.org\naccept-version:1.2\n\n\0".equals(actualString)).isTrue(); + assertThat("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0".equals(output) || + "CONNECT\nhost:github.org\naccept-version:1.2\n\n\0".equals(output)).isTrue(); assertThat(actual.size()).isEqualTo(2); - assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 30)); - assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 30, outputStream.size())); } @Test public void encodeFrameWithHeadersSplitMultipleFrames() { - SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 10); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); headers.setAcceptVersion("1.2"); headers.setHost("github.org"); - Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); - List actual = encoder.encode(frame.getHeaders(), frame.getPayload()); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - actual.forEach(outputStream::writeBytes); - String actualString = outputStream.toString(); + List actual = splittingEncoder(10).encode(headers.getMessageHeaders(), EMPTY_PAYLOAD); + String output = toAggregatedString(actual); - assertThat("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0".equals(actualString) || - "CONNECT\nhost:github.org\naccept-version:1.2\n\n\0".equals(actualString)).isTrue(); + List list = List.of( + "CONNECT\naccept-version:1.2\nhost:github.org\n\n\0", + "CONNECT\nhost:github.org\naccept-version:1.2\n\n\0"); + + assertThat(list).contains(output); assertThat(actual.size()).isEqualTo(5); - assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 10)); - assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 10, 20)); - assertThat(actual.get(2)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 20, 30)); - assertThat(actual.get(3)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 30, 40)); - assertThat(actual.get(4)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 40, outputStream.size())); } @Test public void encodeFrameWithHeadersThatShouldBeEscaped() { - SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, DEFAULT_MESSAGE_MAX_SIZE); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); headers.addNativeHeader("a:\r\n\\b", "alpha:bravo\r\n\\"); - Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); - List actual = encoder.encode(frame.getHeaders(), frame.getPayload()); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - actual.forEach(outputStream::writeBytes); + List actual = splittingEncoder(null).encode(headers.getMessageHeaders(), EMPTY_PAYLOAD); + String output = toAggregatedString(actual); - assertThat(outputStream.toString()).isEqualTo("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0"); + assertThat(output).isEqualTo("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0"); assertThat(actual.size()).isOne(); } @Test public void encodeFrameWithHeadersThatShouldBeEscapedSplitTwoFrames() { - SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 30); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); headers.addNativeHeader("a:\r\n\\b", "alpha:bravo\r\n\\"); - Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); - List actual = encoder.encode(frame.getHeaders(), frame.getPayload()); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - actual.forEach(outputStream::writeBytes); + List actual = splittingEncoder(30).encode(headers.getMessageHeaders(), EMPTY_PAYLOAD); + String output = toAggregatedString(actual); - assertThat(outputStream.toString()).isEqualTo("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0"); + assertThat(output).isEqualTo("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0"); assertThat(actual.size()).isEqualTo(2); - assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 30)); - assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 30, outputStream.size())); } @Test public void encodeFrameWithHeadersThatShouldBeEscapedSplitMultipleFrames() { - SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 10); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); headers.addNativeHeader("a:\r\n\\b", "alpha:bravo\r\n\\"); - Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); - List actual = encoder.encode(frame.getHeaders(), frame.getPayload()); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - actual.forEach(outputStream::writeBytes); - String actualString = outputStream.toString(); + List actual = splittingEncoder(10).encode(headers.getMessageHeaders(), EMPTY_PAYLOAD); + String output = toAggregatedString(actual); - assertThat(outputStream.toString()).isEqualTo("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0"); + assertThat(output).isEqualTo("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0"); assertThat(actual.size()).isEqualTo(5); - assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 10)); - assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 10, 20)); - assertThat(actual.get(2)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 20, 30)); - assertThat(actual.get(3)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 30, 40)); - assertThat(actual.get(4)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 40, outputStream.size())); } @Test public void encodeFrameWithHeadersBody() { - SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, DEFAULT_MESSAGE_MAX_SIZE); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); headers.addNativeHeader("a", "alpha"); - Message frame = MessageBuilder.createMessage( - "Message body".getBytes(), headers.getMessageHeaders()); - List actual = encoder.encode(frame.getHeaders(), frame.getPayload()); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - actual.forEach(outputStream::writeBytes); + List actual = splittingEncoder(null).encode(headers.getMessageHeaders(), "Message body".getBytes()); + String output = toAggregatedString(actual); - assertThat(outputStream.toString()).isEqualTo("SEND\na:alpha\ncontent-length:12\n\nMessage body\0"); + assertThat(output).isEqualTo("SEND\na:alpha\ncontent-length:12\n\nMessage body\0"); assertThat(actual.size()).isOne(); } @Test public void encodeFrameWithHeadersBodySplitTwoFrames() { - SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 30); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); headers.addNativeHeader("a", "alpha"); - Message frame = MessageBuilder.createMessage( - "Message body".getBytes(), headers.getMessageHeaders()); - List actual = encoder.encode(frame.getHeaders(), frame.getPayload()); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - actual.forEach(outputStream::writeBytes); + List actual = splittingEncoder(30).encode(headers.getMessageHeaders(), "Message body".getBytes()); + String output = toAggregatedString(actual); - assertThat(outputStream.toString()).isEqualTo("SEND\na:alpha\ncontent-length:12\n\nMessage body\0"); + assertThat(output).isEqualTo("SEND\na:alpha\ncontent-length:12\n\nMessage body\0"); assertThat(actual.size()).isEqualTo(2); - assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 30)); - assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 30, outputStream.size())); } @Test public void encodeFrameWithHeadersBodySplitMultipleFrames() { - SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 10); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); headers.addNativeHeader("a", "alpha"); - Message frame = MessageBuilder.createMessage( - "Message body".getBytes(), headers.getMessageHeaders()); - List actual = encoder.encode(frame.getHeaders(), frame.getPayload()); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - actual.forEach(outputStream::writeBytes); + List actual = splittingEncoder(10).encode(headers.getMessageHeaders(), "Message body".getBytes()); + String output = toAggregatedString(actual); - assertThat(outputStream.toString()).isEqualTo("SEND\na:alpha\ncontent-length:12\n\nMessage body\0"); + assertThat(output).isEqualTo("SEND\na:alpha\ncontent-length:12\n\nMessage body\0"); assertThat(actual.size()).isEqualTo(5); - assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 10)); - assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 10, 20)); - assertThat(actual.get(2)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 20, 30)); - assertThat(actual.get(3)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 30, 40)); - assertThat(actual.get(4)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 40, outputStream.size())); } @Test public void encodeFrameWithContentLengthPresent() { - SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, DEFAULT_MESSAGE_MAX_SIZE); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); headers.setContentLength(12); - Message frame = MessageBuilder.createMessage( - "Message body".getBytes(), headers.getMessageHeaders()); - List actual = encoder.encode(frame.getHeaders(), frame.getPayload()); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - actual.forEach(outputStream::writeBytes); + List actual = splittingEncoder(null).encode(headers.getMessageHeaders(), "Message body".getBytes()); + String output = toAggregatedString(actual); - assertThat(outputStream.toString()).isEqualTo("SEND\ncontent-length:12\n\nMessage body\0"); + assertThat(output).isEqualTo("SEND\ncontent-length:12\n\nMessage body\0"); assertThat(actual.size()).isOne(); } @Test public void encodeFrameWithContentLengthPresentSplitTwoFrames() { - SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 20); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); headers.setContentLength(12); - Message frame = MessageBuilder.createMessage( - "Message body".getBytes(), headers.getMessageHeaders()); - List actual = encoder.encode(frame.getHeaders(), frame.getPayload()); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - actual.forEach(outputStream::writeBytes); + List actual = splittingEncoder(20).encode(headers.getMessageHeaders(), "Message body".getBytes()); + String output = toAggregatedString(actual); - assertThat(outputStream.toString()).isEqualTo("SEND\ncontent-length:12\n\nMessage body\0"); + assertThat(output).isEqualTo("SEND\ncontent-length:12\n\nMessage body\0"); assertThat(actual.size()).isEqualTo(2); - assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 20)); - assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 20, outputStream.size())); } @Test public void encodeFrameWithContentLengthPresentSplitMultipleFrames() { - SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 10); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); headers.setContentLength(12); - Message frame = MessageBuilder.createMessage( - "Message body".getBytes(), headers.getMessageHeaders()); - List actual = encoder.encode(frame.getHeaders(), frame.getPayload()); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - actual.forEach(outputStream::writeBytes); + List actual = splittingEncoder(10).encode(headers.getMessageHeaders(), "Message body".getBytes()); + String output = toAggregatedString(actual); - assertThat(outputStream.toString()).isEqualTo("SEND\ncontent-length:12\n\nMessage body\0"); + assertThat(output).isEqualTo("SEND\ncontent-length:12\n\nMessage body\0"); assertThat(actual.size()).isEqualTo(4); - assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 10)); - assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 10, 20)); - assertThat(actual.get(2)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 20, 30)); - assertThat(actual.get(3)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 30, outputStream.size())); } @Test public void sameLengthAndBufferSizeLimit() { - SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 44); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); headers.addNativeHeader("a", "1234"); - Message frame = MessageBuilder.createMessage( - "Message body".getBytes(), headers.getMessageHeaders()); - List actual = encoder.encode(frame.getHeaders(), frame.getPayload()); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - actual.forEach(outputStream::writeBytes); + List actual = splittingEncoder(44).encode(headers.getMessageHeaders(), "Message body".getBytes()); + String output = toAggregatedString(actual); - assertThat(outputStream.toString()).isEqualTo("SEND\na:1234\ncontent-length:12\n\nMessage body\0"); + assertThat(output).isEqualTo("SEND\na:1234\ncontent-length:12\n\nMessage body\0"); assertThat(actual.size()).isOne(); - assertThat(outputStream.toByteArray().length).isEqualTo(44); } @Test public void lengthAndBufferSizeLimitExactlySplitTwoFrames() { - SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 22); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); headers.addNativeHeader("a", "1234"); - Message frame = MessageBuilder.createMessage( - "Message body".getBytes(), headers.getMessageHeaders()); - List actual = encoder.encode(frame.getHeaders(), frame.getPayload()); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - actual.forEach(outputStream::writeBytes); + List actual = splittingEncoder(22).encode(headers.getMessageHeaders(), "Message body".getBytes()); + String output = toAggregatedString(actual); - assertThat(outputStream.toString()).isEqualTo("SEND\na:1234\ncontent-length:12\n\nMessage body\0"); + assertThat(output).isEqualTo("SEND\na:1234\ncontent-length:12\n\nMessage body\0"); assertThat(actual.size()).isEqualTo(2); - assertThat(outputStream.toByteArray().length).isEqualTo(44); - assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 22)); - assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 22, 44)); } @Test public void lengthAndBufferSizeLimitExactlySplitMultipleFrames() { - SplittingStompEncoder encoder = new SplittingStompEncoder(STOMP_ENCODER, 11); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); headers.addNativeHeader("a", "1234"); - Message frame = MessageBuilder.createMessage( - "Message body".getBytes(), headers.getMessageHeaders()); - List actual = encoder.encode(frame.getHeaders(), frame.getPayload()); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - actual.forEach(outputStream::writeBytes); + List actual = splittingEncoder(11).encode(headers.getMessageHeaders(), "Message body".getBytes()); + String output = toAggregatedString(actual); - assertThat(outputStream.toString()).isEqualTo("SEND\na:1234\ncontent-length:12\n\nMessage body\0"); + assertThat(output).isEqualTo("SEND\na:1234\ncontent-length:12\n\nMessage body\0"); assertThat(actual.size()).isEqualTo(4); - assertThat(outputStream.toByteArray().length).isEqualTo(44); - assertThat(actual.get(0)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 0, 11)); - assertThat(actual.get(1)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 11, 22)); - assertThat(actual.get(2)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 22, 33)); - assertThat(actual.get(3)).isEqualTo(Arrays.copyOfRange(outputStream.toByteArray(), 33, 44)); } @Test public void bufferSizeLimitShouldBePositive() { - assertThatThrownBy(() -> new SplittingStompEncoder(STOMP_ENCODER, 0)) - .isInstanceOf(IllegalArgumentException.class); - assertThatThrownBy(() -> new SplittingStompEncoder(STOMP_ENCODER, -1)) - .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> splittingEncoder(0)).isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> splittingEncoder(-1)).isInstanceOf(IllegalArgumentException.class); + } + + private static SplittingStompEncoder splittingEncoder(@Nullable Integer bufferSizeLimit) { + return new SplittingStompEncoder(ENCODER, (bufferSizeLimit != null ? bufferSizeLimit : 64 * 1024)); + } + + private static String toAggregatedString(List actual) { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + actual.forEach(outputStream::writeBytes); + return outputStream.toString(StandardCharsets.UTF_8); } } diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/messaging/WebSocketStompClientIntegrationTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/WebSocketStompClientIntegrationTests.java index 7f900496e9..3e464b329a 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/messaging/WebSocketStompClientIntegrationTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/WebSocketStompClientIntegrationTests.java @@ -124,7 +124,7 @@ class WebSocketStompClientIntegrationTests { @Test void publishSubscribeWithSlitMessage() throws Exception { StringBuilder sb = new StringBuilder(); - while (sb.length() < 1024) { + while (sb.length() < 2000) { sb.append("A message with a long body... "); } String payload = sb.toString();