Add configuration for message buffer size limit

BufferingStompDecoder message buffer size limit can now be configured
with JavaConfig MessageBrokerRegistry.setMessageBufferSizeLimit() or
with XML <websocket:message-brocker message-buffer-size="">.

Issue: SPR-11527
This commit is contained in:
Sebastien Deleuze 2014-03-24 09:28:33 +01:00 committed by Rossen Stoyanchev
parent ebffd67b5e
commit bbdb72d808
14 changed files with 103 additions and 20 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -47,6 +47,8 @@ public class MessageBrokerRegistry {
private ChannelRegistration brokerChannelRegistration = new ChannelRegistration();
private Integer messageBufferSizeLimit;
public MessageBrokerRegistry(SubscribableChannel clientInboundChannel, MessageChannel clientOutboundChannel) {
Assert.notNull(clientInboundChannel);
@ -119,6 +121,22 @@ public class MessageBrokerRegistry {
return this.brokerChannelRegistration;
}
/**
* Configure the message buffer size limit in bytes.
* @since 4.0.3
*/
public MessageBrokerRegistry setMessageBufferSizeLimit(Integer messageBufferSizeLimit) {
this.messageBufferSizeLimit = messageBufferSizeLimit;
return this;
}
/**
* Get the message buffer size limit in bytes.
* @since 4.0.3
*/
public Integer getMessageBufferSizeLimit() {
return this.messageBufferSizeLimit;
}
protected SimpleBrokerMessageHandler getSimpleBroker(SubscribableChannel brokerChannel) {
if ((this.simpleBrokerRegistration == null) && (this.brokerRelayRegistration == null)) {

View File

@ -31,7 +31,7 @@ import java.util.concurrent.LinkedBlockingQueue;
/**
* A an extension of {@link org.springframework.messaging.simp.stomp.StompDecoder}
* An extension of {@link org.springframework.messaging.simp.stomp.StompDecoder}
* that chunks any bytes remaining after a single full STOMP frame has been read.
* The remaining bytes may contain more STOMP frames or an incomplete STOMP frame.
*

View File

@ -124,8 +124,11 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
beanName = registerBeanDef(beanDef, parserCxt, source);
RuntimeBeanReference userSessionRegistry = new RuntimeBeanReference(beanName);
String frameBufferSizeAttribute = element.getAttribute("message-buffer-size");
Integer messageBufferSizeLimit = frameBufferSizeAttribute.isEmpty() ? null : Integer.parseInt(frameBufferSizeAttribute);
RuntimeBeanReference subProtocolWsHandler = registerSubProtocolWebSocketHandler(
clientInChannel, clientOutChannel, userSessionRegistry, parserCxt, source);
clientInChannel, clientOutChannel, userSessionRegistry, messageBufferSizeLimit, parserCxt, source);
for(Element stompEndpointElem : DomUtils.getChildElementsByTagName(element, "stomp-endpoint")) {
@ -228,10 +231,14 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
private RuntimeBeanReference registerSubProtocolWebSocketHandler(
RuntimeBeanReference clientInChannel, RuntimeBeanReference clientOutChannel,
RuntimeBeanReference userSessionRegistry, ParserContext parserCxt, Object source) {
RuntimeBeanReference userSessionRegistry, Integer messageBufferSizeLimit,
ParserContext parserCxt, Object source) {
RootBeanDefinition stompHandlerDef = new RootBeanDefinition(StompSubProtocolHandler.class);
stompHandlerDef.getPropertyValues().add("userSessionRegistry", userSessionRegistry);
if(messageBufferSizeLimit != null) {
stompHandlerDef.getPropertyValues().add("messageBufferSizeLimit", messageBufferSizeLimit);
}
registerBeanDef(stompHandlerDef, parserCxt, source);
ConstructorArgumentValues cavs = new ConstructorArgumentValues();

View File

@ -21,6 +21,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.user.UserSessionRegistry;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
@ -57,7 +58,8 @@ public class WebMvcStompEndpointRegistry implements StompEndpointRegistry {
public WebMvcStompEndpointRegistry(WebSocketHandler webSocketHandler,
UserSessionRegistry userSessionRegistry, TaskScheduler defaultSockJsTaskScheduler) {
UserSessionRegistry userSessionRegistry, TaskScheduler defaultSockJsTaskScheduler,
MessageBrokerRegistry brokerRegistry) {
Assert.notNull(webSocketHandler);
Assert.notNull(userSessionRegistry);
@ -67,6 +69,9 @@ public class WebMvcStompEndpointRegistry implements StompEndpointRegistry {
this.stompHandler = new StompSubProtocolHandler();
this.stompHandler.setUserSessionRegistry(userSessionRegistry);
this.sockJsScheduler = defaultSockJsTaskScheduler;
if(brokerRegistry.getMessageBufferSizeLimit() != null) {
this.stompHandler.setMessageBufferSizeLimit(brokerRegistry.getMessageBufferSizeLimit());
}
}
private static SubProtocolWebSocketHandler unwrapSubProtocolWebSocketHandler(WebSocketHandler webSocketHandler) {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -42,7 +42,8 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac
@Bean
public HandlerMapping stompWebSocketHandlerMapping() {
WebMvcStompEndpointRegistry registry = new WebMvcStompEndpointRegistry(
subProtocolWebSocketHandler(), userSessionRegistry(), messageBrokerSockJsTaskScheduler());
subProtocolWebSocketHandler(), userSessionRegistry(),
messageBrokerSockJsTaskScheduler(), getBrokerRegistry());
registerStompEndpoints(registry);
return registry.getHandlerMapping();
}

View File

@ -79,16 +79,16 @@ public class StompSubProtocolHandler implements SubProtocolHandler {
/**
* TODO
* @param messageBufferSizeLimit
* Set the message buffer size limit in bytes.
* @since 4.0.3
*/
public void setMessageBufferSizeLimit(int messageBufferSizeLimit) {
this.messageBufferSizeLimit = messageBufferSizeLimit;
}
/**
* TODO
* @return
* Get the message buffer size limit in bytes.
* @since 4.0.3
*/
public int getMessageBufferSizeLimit() {
return this.messageBufferSizeLimit;

View File

@ -575,6 +575,13 @@
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="message-buffer-size" type="xsd:int">
<xsd:annotation>
<xsd:documentation><![CDATA[
The message buffer size limit in bytes for simple messaging protocols like STOMP.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="order" type="xsd:token">
<xsd:annotation>
<xsd:documentation><![CDATA[

View File

@ -114,6 +114,9 @@ public class MessageBrokerBeanDefinitionParserTests {
(StompSubProtocolHandler) subProtocolWsHandler.getProtocolHandlerMap().get("v12.stomp");
assertNotNull(stompHandler);
int messageBufferSizeLimit = (int)new DirectFieldAccessor(stompHandler).getPropertyValue("messageBufferSizeLimit");
assertEquals(123, messageBufferSizeLimit);
httpRequestHandler = (HttpRequestHandler) suhm.getUrlMap().get("/test/**");
assertNotNull(httpRequestHandler);
assertThat(httpRequestHandler, Matchers.instanceOf(SockJsHttpRequestHandler.class));

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -22,8 +22,8 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.user.DefaultUserSessionRegistry;
import org.springframework.messaging.simp.user.UserSessionRegistry;
import org.springframework.scheduling.TaskScheduler;
@ -47,15 +47,19 @@ public class WebMvcStompEndpointRegistryTests {
private UserSessionRegistry userSessionRegistry;
private MessageBrokerRegistry messageBrokerRegistry;
@Before
public void setup() {
MessageChannel inChannel = Mockito.mock(MessageChannel.class);
SubscribableChannel inChannel = Mockito.mock(SubscribableChannel.class);
SubscribableChannel outChannel = Mockito.mock(SubscribableChannel.class);
this.webSocketHandler = new SubProtocolWebSocketHandler(inChannel, outChannel);
this.userSessionRegistry = new DefaultUserSessionRegistry();
this.messageBrokerRegistry = new MessageBrokerRegistry(inChannel, outChannel);
TaskScheduler taskScheduler = Mockito.mock(TaskScheduler.class);
this.registry = new WebMvcStompEndpointRegistry(webSocketHandler, userSessionRegistry, taskScheduler);
this.registry = new WebMvcStompEndpointRegistry(webSocketHandler, userSessionRegistry,
taskScheduler, messageBrokerRegistry);
}

View File

@ -24,11 +24,13 @@ import java.util.Set;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.support.AbstractSubscribableChannel;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
import org.springframework.messaging.handler.annotation.MessageMapping;
@ -43,7 +45,9 @@ import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TestWebSocketSession;
import org.springframework.web.socket.messaging.StompSubProtocolHandler;
import org.springframework.web.socket.messaging.StompTextMessageBuilder;
import org.springframework.web.socket.messaging.SubProtocolHandler;
import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler;
import static org.junit.Assert.*;
@ -104,6 +108,18 @@ public class WebSocketMessageBrokerConfigurationSupportTests {
assertTrue(handlers.iterator().next() instanceof SubProtocolWebSocketHandler);
}
@Test
public void maxFrameBufferSize() {
SubProtocolWebSocketHandler subProtocolWebSocketHandler = this.config.getBean("subProtocolWebSocketHandler", SubProtocolWebSocketHandler.class);
List<SubProtocolHandler> protocolHandlers = subProtocolWebSocketHandler.getProtocolHandlers();
for(SubProtocolHandler protocolHandler : protocolHandlers) {
assertTrue(protocolHandler instanceof StompSubProtocolHandler);
DirectFieldAccessor protocolHandlerFieldAccessor = new DirectFieldAccessor(protocolHandler);
assertEquals(123, protocolHandlerFieldAccessor.getPropertyValue("messageBufferSizeLimit"));
}
}
@Controller
static class TestController {
@ -133,6 +149,11 @@ public class WebSocketMessageBrokerConfigurationSupportTests {
registry.addEndpoint("/simpleBroker");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setMessageBufferSizeLimit(123);
}
}
@Configuration

View File

@ -62,6 +62,12 @@ public class TestWebSocketSession implements WebSocketSession {
private HttpHeaders headers;
public TestWebSocketSession() {
}
public TestWebSocketSession(String id) {
this.id = id;
}
@Override
public String getId() {

View File

@ -146,8 +146,10 @@ public class StompSubProtocolHandlerTests {
assertEquals(1, this.session.getSentMessages().size());
TextMessage textMessage = (TextMessage) this.session.getSentMessages().get(0);
List<Message<byte[]>> message = new StompDecoder().decode(ByteBuffer.wrap(textMessage.getPayload().getBytes()));
StompHeaderAccessor replyHeaders = StompHeaderAccessor.wrap(message.get(0));
List<Message<byte[]>> messages = new StompDecoder().decode(ByteBuffer.wrap(textMessage.getPayload().getBytes()));
assertEquals(1, messages.size());
StompHeaderAccessor replyHeaders = StompHeaderAccessor.wrap(messages.get(0));
assertEquals(StompCommand.CONNECTED, replyHeaders.getCommand());
assertEquals("1.1", replyHeaders.getVersion());

View File

@ -4,7 +4,7 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket http://www.springframework.org/schema/websocket/spring-websocket-4.0.xsd">
<websocket:message-broker application-destination-prefix="/app" user-destination-prefix="/personal">
<websocket:message-broker application-destination-prefix="/app" user-destination-prefix="/personal" message-buffer-size="123">
<websocket:stomp-endpoint path=" /foo,/bar">
<websocket:handshake-handler ref="myHandler"/>
</websocket:stomp-endpoint>

View File

@ -37563,7 +37563,8 @@ The Spring Framework provides support for using STOMP over WebSocket through
the +spring-messaging+ and +spring-websocket+ modules. It's easy to enable it.
Here is an example of configuring a STOMP WebSocket endpoint with SockJS fallback
options. The endpoint is available for clients to connect to at URL path `/portfolio`:
options. The endpoint is available for clients to connect to at URL path `/app/portfolio`.
It is configured with a 1 Mbytes message buffer size limit (64 Kbytes by default):
[source,java,indent=0]
[subs="verbatim,quotes"]
@ -37575,6 +37576,13 @@ options. The endpoint is available for clients to connect to at URL path `/portf
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.setApplicationDestinationPrefixes("/app")
.setMessageBufferSizeLimit(1024*1024)
.enableSimpleBroker("/queue", "/topic");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio").withSockJS();
@ -37599,10 +37607,11 @@ XML configuration equivalent:
http://www.springframework.org/schema/websocket
http://www.springframework.org/schema/websocket/spring-websocket-4.0.xsd">
<websocket:message-broker>
<websocket:message-broker application-destination-prefix="/app" message-buffer-size="1048576">
<websocket:stomp-endpoint path="/portfolio">
<websocket:sockjs/>
</websocket:stomp-endpoint>
<websocket:simple-broker prefix="/queue, /topic"/>
...
</websocket:message-broker>