diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/AbstractWebSocketMessageBrokerConfigurer.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/AbstractWebSocketMessageBrokerConfigurer.java index 9d235711fa..e5a89b6532 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/AbstractWebSocketMessageBrokerConfigurer.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/AbstractWebSocketMessageBrokerConfigurer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,6 +31,11 @@ import java.util.List; */ public abstract class AbstractWebSocketMessageBrokerConfigurer implements WebSocketMessageBrokerConfigurer { + + @Override + public void configureWebSocketTransport(WebSocketTransportRegistration registry) { + } + @Override public void configureClientInboundChannel(ChannelRegistration registration) { } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebMvcStompEndpointRegistry.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebMvcStompEndpointRegistry.java index 9743025a9c..26f8c6a89f 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebMvcStompEndpointRegistry.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebMvcStompEndpointRegistry.java @@ -58,26 +58,40 @@ public class WebMvcStompEndpointRegistry implements StompEndpointRegistry { public WebMvcStompEndpointRegistry(WebSocketHandler webSocketHandler, - UserSessionRegistry userSessionRegistry, TaskScheduler defaultSockJsTaskScheduler, - MessageBrokerRegistry brokerRegistry) { + WebSocketTransportRegistration transportRegistration, UserSessionRegistry userSessionRegistry, + TaskScheduler defaultSockJsTaskScheduler, MessageBrokerRegistry brokerRegistry) { - Assert.notNull(webSocketHandler); - Assert.notNull(userSessionRegistry); + Assert.notNull(webSocketHandler, "'webSocketHandler' is required "); + Assert.notNull(transportRegistration, "'transportRegistration' is required"); + Assert.notNull(userSessionRegistry, "'userSessionRegistry' is required"); this.webSocketHandler = webSocketHandler; this.subProtocolWebSocketHandler = unwrapSubProtocolWebSocketHandler(webSocketHandler); + + if (transportRegistration.getSendTimeLimit() != null) { + this.subProtocolWebSocketHandler.setSendTimeLimit(transportRegistration.getSendTimeLimit()); + } + if (transportRegistration.getSendBufferSizeLimit() != null) { + this.subProtocolWebSocketHandler.setSendBufferSizeLimit(transportRegistration.getSendBufferSizeLimit()); + } + this.stompHandler = new StompSubProtocolHandler(); this.stompHandler.setUserSessionRegistry(userSessionRegistry); + + if (transportRegistration.getMessageBufferSizeLimit() != null) { + this.stompHandler.setMessageBufferSizeLimit(transportRegistration.getMessageBufferSizeLimit()); + } + this.sockJsScheduler = defaultSockJsTaskScheduler; + if(brokerRegistry.getMessageBufferSizeLimit() != null) { this.stompHandler.setMessageBufferSizeLimit(brokerRegistry.getMessageBufferSizeLimit()); } } - private static SubProtocolWebSocketHandler unwrapSubProtocolWebSocketHandler(WebSocketHandler webSocketHandler) { - WebSocketHandler actual = WebSocketHandlerDecorator.unwrap(webSocketHandler); - Assert.isInstanceOf(SubProtocolWebSocketHandler.class, actual, - "No SubProtocolWebSocketHandler found: " + webSocketHandler); + private static SubProtocolWebSocketHandler unwrapSubProtocolWebSocketHandler(WebSocketHandler wsHandler) { + WebSocketHandler actual = WebSocketHandlerDecorator.unwrap(wsHandler); + Assert.isInstanceOf(SubProtocolWebSocketHandler.class, actual, "No SubProtocolWebSocketHandler in " + wsHandler); return (SubProtocolWebSocketHandler) actual; } @@ -85,8 +99,8 @@ public class WebMvcStompEndpointRegistry implements StompEndpointRegistry { @Override public StompWebSocketEndpointRegistration addEndpoint(String... paths) { this.subProtocolWebSocketHandler.addProtocolHandler(this.stompHandler); - WebMvcStompWebSocketEndpointRegistration registration = new WebMvcStompWebSocketEndpointRegistration( - paths, this.webSocketHandler, this.sockJsScheduler); + WebMvcStompWebSocketEndpointRegistration registration = + new WebMvcStompWebSocketEndpointRegistration(paths, this.webSocketHandler, this.sockJsScheduler); this.registrations.add(registration); return registration; } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java index 158b6b105c..6a0b8c49d8 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java @@ -18,6 +18,8 @@ package org.springframework.web.socket.config.annotation; import org.springframework.context.annotation.Bean; import org.springframework.messaging.simp.config.AbstractMessageBrokerConfiguration; +import org.springframework.messaging.simp.config.MessageBrokerRegistry; +import org.springframework.messaging.simp.user.UserSessionRegistry; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.web.servlet.HandlerMapping; import org.springframework.web.socket.WebSocketHandler; @@ -36,15 +38,26 @@ import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler; */ public abstract class WebSocketMessageBrokerConfigurationSupport extends AbstractMessageBrokerConfiguration { + private WebSocketTransportRegistration transportRegistration; + + protected WebSocketMessageBrokerConfigurationSupport() { } @Bean public HandlerMapping stompWebSocketHandlerMapping() { + + WebSocketHandler webSocketHandler = subProtocolWebSocketHandler(); + UserSessionRegistry sessionRegistry = userSessionRegistry(); + WebSocketTransportRegistration transportRegistration = getTransportRegistration(); + ThreadPoolTaskScheduler taskScheduler = messageBrokerSockJsTaskScheduler(); + MessageBrokerRegistry brokerRegistry = getBrokerRegistry(); + WebMvcStompEndpointRegistry registry = new WebMvcStompEndpointRegistry( - subProtocolWebSocketHandler(), userSessionRegistry(), - messageBrokerSockJsTaskScheduler(), getBrokerRegistry()); + webSocketHandler, transportRegistration, sessionRegistry, taskScheduler, brokerRegistry); + registerStompEndpoints(registry); + return registry.getHandlerMapping(); } @@ -53,6 +66,17 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac return new SubProtocolWebSocketHandler(clientInboundChannel(), clientOutboundChannel()); } + protected final WebSocketTransportRegistration getTransportRegistration() { + if (this.transportRegistration == null) { + this.transportRegistration = new WebSocketTransportRegistration(); + configureWebSocketTransport(this.transportRegistration); + } + return this.transportRegistration; + } + + protected void configureWebSocketTransport(WebSocketTransportRegistration registry) { + } + /** * The default TaskScheduler to use if none is configured via * {@link SockJsServiceRegistration#setTaskScheduler(org.springframework.scheduling.TaskScheduler)}, i.e. diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurer.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurer.java index 002b967a5a..c3e8e735d6 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurer.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,10 +35,17 @@ import java.util.List; public interface WebSocketMessageBrokerConfigurer { /** - * Configure STOMP over WebSocket endpoints. + * Register STOMP endpoints mapping each to a specific URL and (optionally) + * enabling and configuring SockJS fallback options. */ void registerStompEndpoints(StompEndpointRegistry registry); + /** + * Configure options related to the processing of messages received from and + * sent to WebSocket clients. + */ + void configureWebSocketTransport(WebSocketTransportRegistration registry); + /** * Configure the {@link org.springframework.messaging.MessageChannel} used for * incoming messages from WebSocket clients. By default the channel is backed diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketTransportRegistration.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketTransportRegistration.java new file mode 100644 index 0000000000..60d829f70f --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketTransportRegistration.java @@ -0,0 +1,150 @@ +/* + * Copyright 2002-2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.socket.config.annotation; + + +/** + * Configure the processing of messages received from and sent to WebSocket clients. + * + * @author Rossen Stoyanchev + * @since 4.0.3 + */ +public class WebSocketTransportRegistration { + + private Integer messageBufferSizeLimit; + + private Integer sendTimeLimit; + + private Integer sendBufferSizeLimit; + + + /** + * Configure the maximum size of the buffer to use when an incoming message + * for a sub-protocol (e.g. STOMP) has been split into multiple WebSocket + * messages or multiple HTTP POSTs when SockJS fallback options are in use. + * + *

In theory a WebSocket message can be almost unlimited in size. + * In practice WebSocket servers impose limits on incoming message size. + * STOMP clients for example tend to split large messages around 16K + * boundaries. Therefore a server must be able to buffer partial content + * and decode when enough data is received. Use this property to configure + * the max size of the buffer to use. + * + *

The default value is 64K (i.e. 64 * 1024). + * + *

NOTE that the current version 1.2 of the STOMP spec + * does not specifically discuss how to send STOMP messages over WebSocket. + * Version 2 of the spec will but in the mean time existing client libraries + * have already established a practice that servers must handle. + */ + public WebSocketTransportRegistration setMessageBufferSizeLimit(int bufferSizeLimit) { + this.messageBufferSizeLimit = bufferSizeLimit; + return this; + } + + /** + * Protected accessor for internal use. + */ + protected Integer getMessageBufferSizeLimit() { + return this.messageBufferSizeLimit; + } + + /** + * Configure a time limit (in milliseconds) for the maximum amount of a time + * allowed when sending messages to a WebSocket session or writing to an + * HTTP response when SockJS fallback option are in use. + * + *

In general WebSocket servers expect that messages to a single WebSocket + * session are sent from a single thread at a time. This is automatically + * guaranteed when using {@code @EnableWebSocketMessageBroker} configuration. + * If message sending is slow, or at least slower than rate of messages sending, + * subsequent messages are buffered until either the {@code sendTimeLimit} + * or the {@code sendBufferSizeLimit} are reached at which point the session + * state is cleared and an attempt is made to close the session. + * + *

NOTE that the session time limit is checked only + * on attempts to send additional messages. So if only a single message is + * sent and it hangs, the session will not time out until another message is + * sent or the underlying physical socket times out. So this is not a + * replacement for WebSocket server or HTTP connection timeout but is rather + * intended to control the extent of buffering of unsent messages. + * + *

NOTE that closing the session may not succeed in + * actually closing the physical socket and may also hang. This is true + * especially when using blocking IO such as the BIO connector in Tomcat + * that is used by default on Tomcat 7. Therefore it is recommended to ensure + * the server is using non-blocking IO such as Tomcat's NIO connector that + * is used by default on Tomcat 8. If you must use blocking IO consider + * customizing OS-level TCP settings, for example + * {@code /proc/sys/net/ipv4/tcp_retries2} on Linux. + * + *

The default value is 10 seconds (i.e. 10 * 10000). + * + * @param timeLimit the timeout value in milliseconds; the value must be + * greater than 0, otherwise it is ignored. + */ + public WebSocketTransportRegistration setSendTimeLimit(int timeLimit) { + this.sendTimeLimit = timeLimit; + return this; + } + + /** + * Protected accessor for internal use. + */ + protected Integer getSendTimeLimit() { + return this.sendTimeLimit; + } + + /** + * Configure the maximum amount of data to buffer when sending messages + * to a WebSocket session, or an HTTP response when SockJS fallback + * option are in use. + * + *

In general WebSocket servers expect that messages to a single WebSocket + * session are sent from a single thread at a time. This is automatically + * guaranteed when using {@code @EnableWebSocketMessageBroker} configuration. + * If message sending is slow, or at least slower than rate of messages sending, + * subsequent messages are buffered until either the {@code sendTimeLimit} + * or the {@code sendBufferSizeLimit} are reached at which point the session + * state is cleared and an attempt is made to close the session. + * + *

NOTE that closing the session may not succeed in + * actually closing the physical socket and may also hang. This is true + * especially when using blocking IO such as the BIO connector in Tomcat + * configured by default on Tomcat 7. Therefore it is recommended to ensure + * the server is using non-blocking IO such as Tomcat's NIO connector used + * by default on Tomcat 8. If you must use blocking IO consider customizing + * OS-level TCP settings, for example {@code /proc/sys/net/ipv4/tcp_retries2} + * on Linux. + * + *

The default value is 512K (i.e. 512 * 1024). + * + * @param sendBufferSizeLimit the maximum number of bytes to buffer when + * sending messages; if the value is less than or equal to 0 then buffering + * is effectively disabled. + */ + public WebSocketTransportRegistration setSendBufferSizeLimit(int sendBufferSizeLimit) { + this.sendBufferSizeLimit = sendBufferSizeLimit; + return this; + } + + /** + * Protected accessor for internal use. + */ + protected Integer getSendBufferSizeLimit() { + return this.sendBufferSizeLimit; + } +} diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecorator.java b/spring-websocket/src/main/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecorator.java index 33947496c2..c00dd32a2a 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecorator.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecorator.java @@ -18,6 +18,7 @@ package org.springframework.web.socket.handler; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.util.Assert; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketMessage; import org.springframework.web.socket.WebSocketSession; diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java index 20968d6db5..c51d38e543 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java @@ -78,7 +78,7 @@ public class SubProtocolWebSocketHandler private int sendTimeLimit = 10 * 1000; - private int sendBufferSizeLimit = 64 * 1024; + private int sendBufferSizeLimit = 512 * 1024; private Object lifecycleMonitor = new Object(); diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebMvcStompEndpointRegistryTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebMvcStompEndpointRegistryTests.java index b26ad06593..34ac025a06 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebMvcStompEndpointRegistryTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebMvcStompEndpointRegistryTests.java @@ -58,7 +58,7 @@ public class WebMvcStompEndpointRegistryTests { this.userSessionRegistry = new DefaultUserSessionRegistry(); this.messageBrokerRegistry = new MessageBrokerRegistry(inChannel, outChannel); TaskScheduler taskScheduler = Mockito.mock(TaskScheduler.class); - this.registry = new WebMvcStompEndpointRegistry(webSocketHandler, userSessionRegistry, + this.registry = new WebMvcStompEndpointRegistry(webSocketHandler, transportRegistration, userSessionRegistry, taskScheduler, messageBrokerRegistry); }