From c11484b2e723e2a6ec18e2439ab90c76019ef306 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 24 Mar 2014 18:05:37 -0400 Subject: [PATCH] Add WebSocket transport configuration support Issue: SPR-11527 --- .../simp/config/MessageBrokerRegistry.java | 19 --- .../MessageBrokerBeanDefinitionParser.java | 30 +++-- ...tractWebSocketMessageBrokerConfigurer.java | 2 +- ...ngWebSocketMessageBrokerConfiguration.java | 7 ++ .../WebMvcStompEndpointRegistry.java | 15 +-- ...cketMessageBrokerConfigurationSupport.java | 3 +- .../WebSocketTransportRegistration.java | 16 +-- .../messaging/StompSubProtocolHandler.java | 24 ++-- .../socket/config/spring-websocket-4.0.xsd | 112 ++++++++++++++++-- ...essageBrokerBeanDefinitionParserTests.java | 6 +- .../WebMvcStompEndpointRegistryTests.java | 12 +- ...essageBrokerConfigurationSupportTests.java | 23 ++-- .../config/websocket-config-broker-simple.xml | 9 +- src/asciidoc/index.adoc | 6 +- 14 files changed, 184 insertions(+), 100 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java index 9e48f956ab9..11400f9eefe 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java @@ -47,8 +47,6 @@ public class MessageBrokerRegistry { private ChannelRegistration brokerChannelRegistration = new ChannelRegistration(); - private Integer messageBufferSizeLimit; - public MessageBrokerRegistry(SubscribableChannel clientInboundChannel, MessageChannel clientOutboundChannel) { Assert.notNull(clientInboundChannel); @@ -121,23 +119,6 @@ public class MessageBrokerRegistry { return this.brokerChannelRegistration; } - /** - * Configure the message buffer size limit in bytes. - * @since 4.0.3 - */ - public MessageBrokerRegistry setMessageBufferSizeLimit(Integer messageBufferSizeLimit) { - this.messageBufferSizeLimit = messageBufferSizeLimit; - return this; - } - - /** - * Get the message buffer size limit in bytes. - * @since 4.0.3 - */ - public Integer getMessageBufferSizeLimit() { - return this.messageBufferSizeLimit; - } - protected SimpleBrokerMessageHandler getSimpleBroker(SubscribableChannel brokerChannel) { if ((this.simpleBrokerRegistration == null) && (this.brokerRelayRegistration == null)) { enableSimpleBroker(); diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java index 2fcda4fe5e9..4bfc632782d 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java @@ -124,11 +124,8 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { beanName = registerBeanDef(beanDef, parserCxt, source); RuntimeBeanReference userSessionRegistry = new RuntimeBeanReference(beanName); - String frameBufferSizeAttribute = element.getAttribute("message-buffer-size"); - Integer messageBufferSizeLimit = frameBufferSizeAttribute.isEmpty() ? null : Integer.parseInt(frameBufferSizeAttribute); - RuntimeBeanReference subProtocolWsHandler = registerSubProtocolWebSocketHandler( - clientInChannel, clientOutChannel, userSessionRegistry, messageBufferSizeLimit, parserCxt, source); + element, clientInChannel, clientOutChannel, userSessionRegistry, parserCxt, source); for(Element stompEndpointElem : DomUtils.getChildElementsByTagName(element, "stomp-endpoint")) { @@ -229,16 +226,12 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { return new RuntimeBeanReference(channelName); } - private RuntimeBeanReference registerSubProtocolWebSocketHandler( + private RuntimeBeanReference registerSubProtocolWebSocketHandler(Element element, RuntimeBeanReference clientInChannel, RuntimeBeanReference clientOutChannel, - RuntimeBeanReference userSessionRegistry, Integer messageBufferSizeLimit, - ParserContext parserCxt, Object source) { + RuntimeBeanReference userSessionRegistry, ParserContext parserCxt, Object source) { RootBeanDefinition stompHandlerDef = new RootBeanDefinition(StompSubProtocolHandler.class); stompHandlerDef.getPropertyValues().add("userSessionRegistry", userSessionRegistry); - if(messageBufferSizeLimit != null) { - stompHandlerDef.getPropertyValues().add("messageBufferSizeLimit", messageBufferSizeLimit); - } registerBeanDef(stompHandlerDef, parserCxt, source); ConstructorArgumentValues cavs = new ConstructorArgumentValues(); @@ -248,6 +241,23 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { RootBeanDefinition subProtocolWshDef = new RootBeanDefinition(SubProtocolWebSocketHandler.class, cavs, null); subProtocolWshDef.getPropertyValues().addPropertyValue("protocolHandlers", stompHandlerDef); String subProtocolWshName = registerBeanDef(subProtocolWshDef, parserCxt, source); + + Element transportElem = DomUtils.getChildElementByTagName(element, "transport"); + if (transportElem != null) { + String messageSize = transportElem.getAttribute("message-size"); + if (messageSize != null) { + stompHandlerDef.getPropertyValues().add("messageSizeLimit", messageSize); + } + String sendTimeLimit = transportElem.getAttribute("send-timeout"); + if (sendTimeLimit != null) { + subProtocolWshDef.getPropertyValues().add("sendTimeLimit", sendTimeLimit); + } + String sendBufferSizeLimit = transportElem.getAttribute("send-buffer-size"); + if (sendBufferSizeLimit != null) { + subProtocolWshDef.getPropertyValues().add("sendBufferSizeLimit", sendBufferSizeLimit); + } + } + return new RuntimeBeanReference(subProtocolWshName); } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/AbstractWebSocketMessageBrokerConfigurer.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/AbstractWebSocketMessageBrokerConfigurer.java index e5a89b65328..f3b0e101c44 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/AbstractWebSocketMessageBrokerConfigurer.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/AbstractWebSocketMessageBrokerConfigurer.java @@ -33,7 +33,7 @@ public abstract class AbstractWebSocketMessageBrokerConfigurer implements WebSoc @Override - public void configureWebSocketTransport(WebSocketTransportRegistration registry) { + public void configureWebSocketTransport(WebSocketTransportRegistration registration) { } @Override diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/DelegatingWebSocketMessageBrokerConfiguration.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/DelegatingWebSocketMessageBrokerConfiguration.java index 45ef5e95660..c7917a3008e 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/DelegatingWebSocketMessageBrokerConfiguration.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/DelegatingWebSocketMessageBrokerConfiguration.java @@ -58,6 +58,13 @@ public class DelegatingWebSocketMessageBrokerConfiguration extends WebSocketMess } } + @Override + protected void configureWebSocketTransport(WebSocketTransportRegistration registration) { + for (WebSocketMessageBrokerConfigurer c : this.configurers) { + c.configureWebSocketTransport(registration); + } + } + @Override protected void configureClientInboundChannel(ChannelRegistration registration) { for (WebSocketMessageBrokerConfigurer c : this.configurers) { diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebMvcStompEndpointRegistry.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebMvcStompEndpointRegistry.java index 26f8c6a89f1..7a3fcee55e0 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebMvcStompEndpointRegistry.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebMvcStompEndpointRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.messaging.simp.user.UserSessionRegistry; import org.springframework.scheduling.TaskScheduler; import org.springframework.util.Assert; @@ -58,8 +57,8 @@ public class WebMvcStompEndpointRegistry implements StompEndpointRegistry { public WebMvcStompEndpointRegistry(WebSocketHandler webSocketHandler, - WebSocketTransportRegistration transportRegistration, UserSessionRegistry userSessionRegistry, - TaskScheduler defaultSockJsTaskScheduler, MessageBrokerRegistry brokerRegistry) { + WebSocketTransportRegistration transportRegistration, + UserSessionRegistry userSessionRegistry, TaskScheduler defaultSockJsTaskScheduler) { Assert.notNull(webSocketHandler, "'webSocketHandler' is required "); Assert.notNull(transportRegistration, "'transportRegistration' is required"); @@ -78,15 +77,11 @@ public class WebMvcStompEndpointRegistry implements StompEndpointRegistry { this.stompHandler = new StompSubProtocolHandler(); this.stompHandler.setUserSessionRegistry(userSessionRegistry); - if (transportRegistration.getMessageBufferSizeLimit() != null) { - this.stompHandler.setMessageBufferSizeLimit(transportRegistration.getMessageBufferSizeLimit()); + if (transportRegistration.getMessageSizeLimit() != null) { + this.stompHandler.setMessageSizeLimit(transportRegistration.getMessageSizeLimit()); } this.sockJsScheduler = defaultSockJsTaskScheduler; - - if(brokerRegistry.getMessageBufferSizeLimit() != null) { - this.stompHandler.setMessageBufferSizeLimit(brokerRegistry.getMessageBufferSizeLimit()); - } } private static SubProtocolWebSocketHandler unwrapSubProtocolWebSocketHandler(WebSocketHandler wsHandler) { diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java index 6a0b8c49d87..216bca35de3 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java @@ -51,10 +51,9 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac UserSessionRegistry sessionRegistry = userSessionRegistry(); WebSocketTransportRegistration transportRegistration = getTransportRegistration(); ThreadPoolTaskScheduler taskScheduler = messageBrokerSockJsTaskScheduler(); - MessageBrokerRegistry brokerRegistry = getBrokerRegistry(); WebMvcStompEndpointRegistry registry = new WebMvcStompEndpointRegistry( - webSocketHandler, transportRegistration, sessionRegistry, taskScheduler, brokerRegistry); + webSocketHandler, transportRegistration, sessionRegistry, taskScheduler); registerStompEndpoints(registry); diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketTransportRegistration.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketTransportRegistration.java index 60d829f70fb..22941fb4047 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketTransportRegistration.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketTransportRegistration.java @@ -24,7 +24,7 @@ package org.springframework.web.socket.config.annotation; */ public class WebSocketTransportRegistration { - private Integer messageBufferSizeLimit; + private Integer messageSizeLimit; private Integer sendTimeLimit; @@ -32,9 +32,9 @@ public class WebSocketTransportRegistration { /** - * Configure the maximum size of the buffer to use when an incoming message - * for a sub-protocol (e.g. STOMP) has been split into multiple WebSocket - * messages or multiple HTTP POSTs when SockJS fallback options are in use. + * Configure the maximum size for an incoming sub-protocol message. + * For example a STOMP message may be received as multiple WebSocket messages + * or multiple HTTP POST requests when SockJS fallback options are in use. * *

In theory a WebSocket message can be almost unlimited in size. * In practice WebSocket servers impose limits on incoming message size. @@ -50,16 +50,16 @@ public class WebSocketTransportRegistration { * Version 2 of the spec will but in the mean time existing client libraries * have already established a practice that servers must handle. */ - public WebSocketTransportRegistration setMessageBufferSizeLimit(int bufferSizeLimit) { - this.messageBufferSizeLimit = bufferSizeLimit; + public WebSocketTransportRegistration setMessageSizeLimit(int messageSizeLimit) { + this.messageSizeLimit = messageSizeLimit; return this; } /** * Protected accessor for internal use. */ - protected Integer getMessageBufferSizeLimit() { - return this.messageBufferSizeLimit; + protected Integer getMessageSizeLimit() { + return this.messageSizeLimit; } /** diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java index 97275eeb62f..19ca5ba901d 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java @@ -34,7 +34,6 @@ import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.simp.stomp.BufferingStompDecoder; import org.springframework.messaging.simp.stomp.StompCommand; import org.springframework.messaging.simp.stomp.StompConversionException; -import org.springframework.messaging.simp.stomp.StompDecoder; import org.springframework.messaging.simp.stomp.StompEncoder; import org.springframework.messaging.simp.stomp.StompHeaderAccessor; import org.springframework.messaging.simp.user.DestinationUserNameProvider; @@ -69,7 +68,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler { private static final Log logger = LogFactory.getLog(StompSubProtocolHandler.class); - private int messageBufferSizeLimit = 64 * 1024; + private int messageSizeLimit = 64 * 1024; private final Map decoders = new ConcurrentHashMap(); @@ -79,20 +78,17 @@ public class StompSubProtocolHandler implements SubProtocolHandler { /** - * Configure the maximum size of the buffer used when a STOMP message has been - * split over multiple WebSocket messages. - * - *

While the STOMP spec version 1.2 (current as of 4.0.3) does not discuss - * STOMP over WebSocket explicitly, a number of clients already split messages - * around 16K boundaries. Therefore partial content must be buffered before a - * full message can be assembled. + * Configure the maximum size allowed for an incoming STOMP message. + * Since a STOMP message can be received in multiple WebSocket messages, + * buffering may be required and therefore it is necessary to know the maximum + * allowed message size. * *

By default this property is set to 64K. * * @since 4.0.3 */ - public void setMessageBufferSizeLimit(int messageBufferSizeLimit) { - this.messageBufferSizeLimit = messageBufferSizeLimit; + public void setMessageSizeLimit(int messageSizeLimit) { + this.messageSizeLimit = messageSizeLimit; } /** @@ -100,8 +96,8 @@ public class StompSubProtocolHandler implements SubProtocolHandler { * * @since 4.0.3 */ - public int getMessageBufferSizeLimit() { - return this.messageBufferSizeLimit; + public int getMessageSizeLimit() { + return this.messageSizeLimit; } /** @@ -316,7 +312,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler { @Override public void afterSessionStarted(WebSocketSession session, MessageChannel outputChannel) { - this.decoders.put(session.getId(), new BufferingStompDecoder(getMessageBufferSizeLimit())); + this.decoders.put(session.getId(), new BufferingStompDecoder(getMessageSizeLimit())); } @Override diff --git a/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.0.xsd b/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.0.xsd index 15941db2449..b77336e7206 100644 --- a/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.0.xsd +++ b/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.0.xsd @@ -457,17 +457,114 @@ + + + + + + + + + + + + + + + + + + + + + + - - - - - protocolHandlers = subProtocolWebSocketHandler.getProtocolHandlers(); for(SubProtocolHandler protocolHandler : protocolHandlers) { assertTrue(protocolHandler instanceof StompSubProtocolHandler); - DirectFieldAccessor protocolHandlerFieldAccessor = new DirectFieldAccessor(protocolHandler); - assertEquals(123, protocolHandlerFieldAccessor.getPropertyValue("messageBufferSizeLimit")); + assertEquals(128 * 1024, ((StompSubProtocolHandler) protocolHandler).getMessageSizeLimit()); } } @@ -150,10 +152,11 @@ public class WebSocketMessageBrokerConfigurationSupportTests { } @Override - public void configureMessageBroker(MessageBrokerRegistry registry) { - registry.setMessageBufferSizeLimit(123); + public void configureWebSocketTransport(WebSocketTransportRegistration registration) { + registration.setMessageSizeLimit(128 * 1024); + registration.setSendTimeLimit(25 * 1000); + registration.setSendBufferSizeLimit(1024 * 1024); } - } @Configuration diff --git a/spring-websocket/src/test/resources/org/springframework/web/socket/config/websocket-config-broker-simple.xml b/spring-websocket/src/test/resources/org/springframework/web/socket/config/websocket-config-broker-simple.xml index e8970ebed35..045c47e2961 100644 --- a/spring-websocket/src/test/resources/org/springframework/web/socket/config/websocket-config-broker-simple.xml +++ b/spring-websocket/src/test/resources/org/springframework/web/socket/config/websocket-config-broker-simple.xml @@ -4,15 +4,22 @@ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/websocket http://www.springframework.org/schema/websocket/spring-websocket-4.0.xsd"> - + + + + + + + + diff --git a/src/asciidoc/index.adoc b/src/asciidoc/index.adoc index 606a295973e..efec8207948 100644 --- a/src/asciidoc/index.adoc +++ b/src/asciidoc/index.adoc @@ -37563,8 +37563,7 @@ The Spring Framework provides support for using STOMP over WebSocket through the +spring-messaging+ and +spring-websocket+ modules. It's easy to enable it. Here is an example of configuring a STOMP WebSocket endpoint with SockJS fallback -options. The endpoint is available for clients to connect to at URL path `/app/portfolio`. -It is configured with a 1 Mbytes message buffer size limit (64 Kbytes by default): +options. The endpoint is available for clients to connect to at URL path `/app/portfolio`: [source,java,indent=0] [subs="verbatim,quotes"] @@ -37579,7 +37578,6 @@ It is configured with a 1 Mbytes message buffer size limit (64 Kbytes by default @Override public void configureMessageBroker(MessageBrokerRegistry config) { config.setApplicationDestinationPrefixes("/app") - .setMessageBufferSizeLimit(1024*1024) .enableSimpleBroker("/queue", "/topic"); } @@ -37607,7 +37605,7 @@ XML configuration equivalent: http://www.springframework.org/schema/websocket http://www.springframework.org/schema/websocket/spring-websocket-4.0.xsd"> - +