Add WebSocketTransportRegistration

Issue: SPR-11527
This commit is contained in:
Rossen Stoyanchev 2014-03-24 16:12:40 -04:00
parent 545c4effb1
commit 1e9960e1ce
8 changed files with 218 additions and 17 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -31,6 +31,11 @@ import java.util.List;
*/
public abstract class AbstractWebSocketMessageBrokerConfigurer implements WebSocketMessageBrokerConfigurer {
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
}

View File

@ -58,26 +58,40 @@ public class WebMvcStompEndpointRegistry implements StompEndpointRegistry {
public WebMvcStompEndpointRegistry(WebSocketHandler webSocketHandler,
UserSessionRegistry userSessionRegistry, TaskScheduler defaultSockJsTaskScheduler,
MessageBrokerRegistry brokerRegistry) {
WebSocketTransportRegistration transportRegistration, UserSessionRegistry userSessionRegistry,
TaskScheduler defaultSockJsTaskScheduler, MessageBrokerRegistry brokerRegistry) {
Assert.notNull(webSocketHandler);
Assert.notNull(userSessionRegistry);
Assert.notNull(webSocketHandler, "'webSocketHandler' is required ");
Assert.notNull(transportRegistration, "'transportRegistration' is required");
Assert.notNull(userSessionRegistry, "'userSessionRegistry' is required");
this.webSocketHandler = webSocketHandler;
this.subProtocolWebSocketHandler = unwrapSubProtocolWebSocketHandler(webSocketHandler);
if (transportRegistration.getSendTimeLimit() != null) {
this.subProtocolWebSocketHandler.setSendTimeLimit(transportRegistration.getSendTimeLimit());
}
if (transportRegistration.getSendBufferSizeLimit() != null) {
this.subProtocolWebSocketHandler.setSendBufferSizeLimit(transportRegistration.getSendBufferSizeLimit());
}
this.stompHandler = new StompSubProtocolHandler();
this.stompHandler.setUserSessionRegistry(userSessionRegistry);
if (transportRegistration.getMessageBufferSizeLimit() != null) {
this.stompHandler.setMessageBufferSizeLimit(transportRegistration.getMessageBufferSizeLimit());
}
this.sockJsScheduler = defaultSockJsTaskScheduler;
if(brokerRegistry.getMessageBufferSizeLimit() != null) {
this.stompHandler.setMessageBufferSizeLimit(brokerRegistry.getMessageBufferSizeLimit());
}
}
private static SubProtocolWebSocketHandler unwrapSubProtocolWebSocketHandler(WebSocketHandler webSocketHandler) {
WebSocketHandler actual = WebSocketHandlerDecorator.unwrap(webSocketHandler);
Assert.isInstanceOf(SubProtocolWebSocketHandler.class, actual,
"No SubProtocolWebSocketHandler found: " + webSocketHandler);
private static SubProtocolWebSocketHandler unwrapSubProtocolWebSocketHandler(WebSocketHandler wsHandler) {
WebSocketHandler actual = WebSocketHandlerDecorator.unwrap(wsHandler);
Assert.isInstanceOf(SubProtocolWebSocketHandler.class, actual, "No SubProtocolWebSocketHandler in " + wsHandler);
return (SubProtocolWebSocketHandler) actual;
}
@ -85,8 +99,8 @@ public class WebMvcStompEndpointRegistry implements StompEndpointRegistry {
@Override
public StompWebSocketEndpointRegistration addEndpoint(String... paths) {
this.subProtocolWebSocketHandler.addProtocolHandler(this.stompHandler);
WebMvcStompWebSocketEndpointRegistration registration = new WebMvcStompWebSocketEndpointRegistration(
paths, this.webSocketHandler, this.sockJsScheduler);
WebMvcStompWebSocketEndpointRegistration registration =
new WebMvcStompWebSocketEndpointRegistration(paths, this.webSocketHandler, this.sockJsScheduler);
this.registrations.add(registration);
return registration;
}

View File

@ -18,6 +18,8 @@ package org.springframework.web.socket.config.annotation;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.simp.config.AbstractMessageBrokerConfiguration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.user.UserSessionRegistry;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.servlet.HandlerMapping;
import org.springframework.web.socket.WebSocketHandler;
@ -36,15 +38,26 @@ import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler;
*/
public abstract class WebSocketMessageBrokerConfigurationSupport extends AbstractMessageBrokerConfiguration {
private WebSocketTransportRegistration transportRegistration;
protected WebSocketMessageBrokerConfigurationSupport() {
}
@Bean
public HandlerMapping stompWebSocketHandlerMapping() {
WebSocketHandler webSocketHandler = subProtocolWebSocketHandler();
UserSessionRegistry sessionRegistry = userSessionRegistry();
WebSocketTransportRegistration transportRegistration = getTransportRegistration();
ThreadPoolTaskScheduler taskScheduler = messageBrokerSockJsTaskScheduler();
MessageBrokerRegistry brokerRegistry = getBrokerRegistry();
WebMvcStompEndpointRegistry registry = new WebMvcStompEndpointRegistry(
subProtocolWebSocketHandler(), userSessionRegistry(),
messageBrokerSockJsTaskScheduler(), getBrokerRegistry());
webSocketHandler, transportRegistration, sessionRegistry, taskScheduler, brokerRegistry);
registerStompEndpoints(registry);
return registry.getHandlerMapping();
}
@ -53,6 +66,17 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac
return new SubProtocolWebSocketHandler(clientInboundChannel(), clientOutboundChannel());
}
protected final WebSocketTransportRegistration getTransportRegistration() {
if (this.transportRegistration == null) {
this.transportRegistration = new WebSocketTransportRegistration();
configureWebSocketTransport(this.transportRegistration);
}
return this.transportRegistration;
}
protected void configureWebSocketTransport(WebSocketTransportRegistration registry) {
}
/**
* The default TaskScheduler to use if none is configured via
* {@link SockJsServiceRegistration#setTaskScheduler(org.springframework.scheduling.TaskScheduler)}, i.e.

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -35,10 +35,17 @@ import java.util.List;
public interface WebSocketMessageBrokerConfigurer {
/**
* Configure STOMP over WebSocket endpoints.
* Register STOMP endpoints mapping each to a specific URL and (optionally)
* enabling and configuring SockJS fallback options.
*/
void registerStompEndpoints(StompEndpointRegistry registry);
/**
* Configure options related to the processing of messages received from and
* sent to WebSocket clients.
*/
void configureWebSocketTransport(WebSocketTransportRegistration registry);
/**
* Configure the {@link org.springframework.messaging.MessageChannel} used for
* incoming messages from WebSocket clients. By default the channel is backed

View File

@ -0,0 +1,150 @@
/*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.socket.config.annotation;
/**
* Configure the processing of messages received from and sent to WebSocket clients.
*
* @author Rossen Stoyanchev
* @since 4.0.3
*/
public class WebSocketTransportRegistration {
private Integer messageBufferSizeLimit;
private Integer sendTimeLimit;
private Integer sendBufferSizeLimit;
/**
* Configure the maximum size of the buffer to use when an incoming message
* for a sub-protocol (e.g. STOMP) has been split into multiple WebSocket
* messages or multiple HTTP POSTs when SockJS fallback options are in use.
*
* <p>In theory a WebSocket message can be almost unlimited in size.
* In practice WebSocket servers impose limits on incoming message size.
* STOMP clients for example tend to split large messages around 16K
* boundaries. Therefore a server must be able to buffer partial content
* and decode when enough data is received. Use this property to configure
* the max size of the buffer to use.
*
* <p>The default value is 64K (i.e. 64 * 1024).
*
* <p><strong>NOTE</strong> that the current version 1.2 of the STOMP spec
* does not specifically discuss how to send STOMP messages over WebSocket.
* Version 2 of the spec will but in the mean time existing client libraries
* have already established a practice that servers must handle.
*/
public WebSocketTransportRegistration setMessageBufferSizeLimit(int bufferSizeLimit) {
this.messageBufferSizeLimit = bufferSizeLimit;
return this;
}
/**
* Protected accessor for internal use.
*/
protected Integer getMessageBufferSizeLimit() {
return this.messageBufferSizeLimit;
}
/**
* Configure a time limit (in milliseconds) for the maximum amount of a time
* allowed when sending messages to a WebSocket session or writing to an
* HTTP response when SockJS fallback option are in use.
*
* <p>In general WebSocket servers expect that messages to a single WebSocket
* session are sent from a single thread at a time. This is automatically
* guaranteed when using {@code @EnableWebSocketMessageBroker} configuration.
* If message sending is slow, or at least slower than rate of messages sending,
* subsequent messages are buffered until either the {@code sendTimeLimit}
* or the {@code sendBufferSizeLimit} are reached at which point the session
* state is cleared and an attempt is made to close the session.
*
* <p><strong>NOTE</strong> that the session time limit is checked only
* on attempts to send additional messages. So if only a single message is
* sent and it hangs, the session will not time out until another message is
* sent or the underlying physical socket times out. So this is not a
* replacement for WebSocket server or HTTP connection timeout but is rather
* intended to control the extent of buffering of unsent messages.
*
* <p><strong>NOTE</strong> that closing the session may not succeed in
* actually closing the physical socket and may also hang. This is true
* especially when using blocking IO such as the BIO connector in Tomcat
* that is used by default on Tomcat 7. Therefore it is recommended to ensure
* the server is using non-blocking IO such as Tomcat's NIO connector that
* is used by default on Tomcat 8. If you must use blocking IO consider
* customizing OS-level TCP settings, for example
* {@code /proc/sys/net/ipv4/tcp_retries2} on Linux.
*
* <p>The default value is 10 seconds (i.e. 10 * 10000).
*
* @param timeLimit the timeout value in milliseconds; the value must be
* greater than 0, otherwise it is ignored.
*/
public WebSocketTransportRegistration setSendTimeLimit(int timeLimit) {
this.sendTimeLimit = timeLimit;
return this;
}
/**
* Protected accessor for internal use.
*/
protected Integer getSendTimeLimit() {
return this.sendTimeLimit;
}
/**
* Configure the maximum amount of data to buffer when sending messages
* to a WebSocket session, or an HTTP response when SockJS fallback
* option are in use.
*
* <p>In general WebSocket servers expect that messages to a single WebSocket
* session are sent from a single thread at a time. This is automatically
* guaranteed when using {@code @EnableWebSocketMessageBroker} configuration.
* If message sending is slow, or at least slower than rate of messages sending,
* subsequent messages are buffered until either the {@code sendTimeLimit}
* or the {@code sendBufferSizeLimit} are reached at which point the session
* state is cleared and an attempt is made to close the session.
*
* <p><strong>NOTE</strong> that closing the session may not succeed in
* actually closing the physical socket and may also hang. This is true
* especially when using blocking IO such as the BIO connector in Tomcat
* configured by default on Tomcat 7. Therefore it is recommended to ensure
* the server is using non-blocking IO such as Tomcat's NIO connector used
* by default on Tomcat 8. If you must use blocking IO consider customizing
* OS-level TCP settings, for example {@code /proc/sys/net/ipv4/tcp_retries2}
* on Linux.
*
* <p>The default value is 512K (i.e. 512 * 1024).
*
* @param sendBufferSizeLimit the maximum number of bytes to buffer when
* sending messages; if the value is less than or equal to 0 then buffering
* is effectively disabled.
*/
public WebSocketTransportRegistration setSendBufferSizeLimit(int sendBufferSizeLimit) {
this.sendBufferSizeLimit = sendBufferSizeLimit;
return this;
}
/**
* Protected accessor for internal use.
*/
protected Integer getSendBufferSizeLimit() {
return this.sendBufferSizeLimit;
}
}

View File

@ -18,6 +18,7 @@ package org.springframework.web.socket.handler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.util.Assert;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;

View File

@ -78,7 +78,7 @@ public class SubProtocolWebSocketHandler
private int sendTimeLimit = 10 * 1000;
private int sendBufferSizeLimit = 64 * 1024;
private int sendBufferSizeLimit = 512 * 1024;
private Object lifecycleMonitor = new Object();

View File

@ -58,7 +58,7 @@ public class WebMvcStompEndpointRegistryTests {
this.userSessionRegistry = new DefaultUserSessionRegistry();
this.messageBrokerRegistry = new MessageBrokerRegistry(inChannel, outChannel);
TaskScheduler taskScheduler = Mockito.mock(TaskScheduler.class);
this.registry = new WebMvcStompEndpointRegistry(webSocketHandler, userSessionRegistry,
this.registry = new WebMvcStompEndpointRegistry(webSocketHandler, transportRegistration, userSessionRegistry,
taskScheduler, messageBrokerRegistry);
}