Add configurability for underlying WebSocket engine

Issue: SPR-10844
This commit is contained in:
Rossen Stoyanchev 2013-08-22 20:35:03 -04:00
parent ccaa101252
commit a5143057ce
19 changed files with 222 additions and 72 deletions

View File

@ -30,14 +30,16 @@ import org.springframework.web.socket.WebSocketSession;
/**
* An abstract base class for implementations of {@link WebSocketSession}.
*
* @param T the type of the native (or delegate) WebSocket session
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public abstract class AbstractWebSocketSesssion<T> implements DelegatingWebSocketSession<T> {
public abstract class AbstractWebSocketSesssion<T> implements WebSocketSession, NativeWebSocketSession {
protected final Log logger = LogFactory.getLog(getClass());
private T delegateSession;
private T nativeSession;
private final Map<String, Object> handshakeAttributes;
@ -58,28 +60,35 @@ public abstract class AbstractWebSocketSesssion<T> implements DelegatingWebSocke
return this.handshakeAttributes;
}
/**
* @return the WebSocket session to delegate to
*/
public T getDelegateSession() {
return this.delegateSession;
}
@Override
public void afterSessionInitialized(T session) {
Assert.notNull(session, "session must not be null");
this.delegateSession = session;
public T getNativeSession() {
return this.nativeSession;
}
protected final void checkDelegateSessionInitialized() {
Assert.state(this.delegateSession != null, "WebSocket session is not yet initialized");
@SuppressWarnings("unchecked")
@Override
public <R> R getNativeSession(Class<R> requiredType) {
if (requiredType != null) {
if (requiredType.isInstance(this.nativeSession)) {
return (R) this.nativeSession;
}
}
return null;
}
public void initializeNativeSession(T session) {
Assert.notNull(session, "session must not be null");
this.nativeSession = session;
}
protected final void checkNativeSessionInitialized() {
Assert.state(this.nativeSession != null, "WebSocket session is not yet initialized");
}
@Override
public final void sendMessage(WebSocketMessage message) throws IOException {
checkDelegateSessionInitialized();
checkNativeSessionInitialized();
Assert.isTrue(isOpen(), "Cannot send message after connection closed.");
if (logger.isTraceEnabled()) {
@ -109,7 +118,7 @@ public abstract class AbstractWebSocketSesssion<T> implements DelegatingWebSocke
@Override
public final void close(CloseStatus status) throws IOException {
checkDelegateSessionInitialized();
checkNativeSessionInitialized();
if (logger.isDebugEnabled()) {
logger.debug("Closing " + this);
}

View File

@ -54,7 +54,7 @@ public class JettyWebSocketHandlerAdapter implements WebSocketListener {
@Override
public void onWebSocketConnect(Session session) {
try {
this.wsSession.afterSessionInitialized(session);
this.wsSession.initializeNativeSession(session);
this.webSocketHandler.afterConnectionEstablished(this.wsSession);
}
catch (Throwable t) {

View File

@ -58,22 +58,22 @@ public class JettyWebSocketSession extends AbstractWebSocketSesssion<org.eclipse
@Override
public String getId() {
checkDelegateSessionInitialized();
return ObjectUtils.getIdentityHexString(getDelegateSession());
checkNativeSessionInitialized();
return ObjectUtils.getIdentityHexString(getNativeSession());
}
@Override
public URI getUri() {
checkDelegateSessionInitialized();
return getDelegateSession().getUpgradeRequest().getRequestURI();
checkNativeSessionInitialized();
return getNativeSession().getUpgradeRequest().getRequestURI();
}
@Override
public HttpHeaders getHandshakeHeaders() {
checkDelegateSessionInitialized();
checkNativeSessionInitialized();
if (this.headers == null) {
this.headers = new HttpHeaders();
this.headers.putAll(getDelegateSession().getUpgradeRequest().getHeaders());
this.headers.putAll(getNativeSession().getUpgradeRequest().getHeaders());
this.headers = HttpHeaders.readOnlyHttpHeaders(headers);
}
return this.headers;
@ -86,40 +86,40 @@ public class JettyWebSocketSession extends AbstractWebSocketSesssion<org.eclipse
@Override
public InetSocketAddress getLocalAddress() {
checkDelegateSessionInitialized();
return getDelegateSession().getLocalAddress();
checkNativeSessionInitialized();
return getNativeSession().getLocalAddress();
}
@Override
public InetSocketAddress getRemoteAddress() {
checkDelegateSessionInitialized();
return getDelegateSession().getRemoteAddress();
checkNativeSessionInitialized();
return getNativeSession().getRemoteAddress();
}
@Override
public String getAcceptedProtocol() {
checkDelegateSessionInitialized();
return getDelegateSession().getUpgradeResponse().getAcceptedSubProtocol();
checkNativeSessionInitialized();
return getNativeSession().getUpgradeResponse().getAcceptedSubProtocol();
}
@Override
public boolean isOpen() {
return ((getDelegateSession() != null) && getDelegateSession().isOpen());
return ((getNativeSession() != null) && getNativeSession().isOpen());
}
@Override
protected void sendTextMessage(TextMessage message) throws IOException {
getDelegateSession().getRemote().sendString(message.getPayload());
getNativeSession().getRemote().sendString(message.getPayload());
}
@Override
protected void sendBinaryMessage(BinaryMessage message) throws IOException {
getDelegateSession().getRemote().sendBytes(message.getPayload());
getNativeSession().getRemote().sendBytes(message.getPayload());
}
@Override
protected void closeInternal(CloseStatus status) throws IOException {
getDelegateSession().close(status.getCode(), status.getReason());
getNativeSession().close(status.getCode(), status.getReason());
}
}

View File

@ -20,21 +20,26 @@ import org.springframework.web.socket.WebSocketSession;
/**
* A contract for a {@link WebSocketSession} that delegates to another WebSocket session
* (e.g. a native session).
*
* @param T the type of the delegate WebSocket session
* A {@link WebSocketSession} that exposes the underlying, native WebSocketSession
* through a getter.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface DelegatingWebSocketSession<T> extends WebSocketSession {
public interface NativeWebSocketSession extends WebSocketSession {
/**
* Invoked when the delegate WebSocket session has been initialized.
* Return the underlying native WebSocketSession, if available.
* @return the native session or {@code null}
*/
void afterSessionInitialized(T session);
Object getNativeSession();
/**
* Return the underlying native WebSocketSession, if available.
* @param requiredType the required type of the session
* @return the native session of the required type or {@code null}
*/
<T> T getNativeSession(Class<T> requiredType);
}

View File

@ -58,7 +58,7 @@ public class StandardWebSocketHandlerAdapter extends Endpoint {
@Override
public void onOpen(final javax.websocket.Session session, EndpointConfig config) {
this.wsSession.afterSessionInitialized(session);
this.wsSession.initializeNativeSession(session);
if (this.handler.supportsPartialMessages()) {
session.addMessageHandler(new MessageHandler.Partial<String>() {

View File

@ -68,14 +68,14 @@ public class StandardWebSocketSession extends AbstractWebSocketSesssion<javax.we
@Override
public String getId() {
checkDelegateSessionInitialized();
return getDelegateSession().getId();
checkNativeSessionInitialized();
return getNativeSession().getId();
}
@Override
public URI getUri() {
checkDelegateSessionInitialized();
return getDelegateSession().getRequestURI();
checkNativeSessionInitialized();
return getNativeSession().getRequestURI();
}
@Override
@ -85,8 +85,8 @@ public class StandardWebSocketSession extends AbstractWebSocketSesssion<javax.we
@Override
public Principal getPrincipal() {
checkDelegateSessionInitialized();
return getDelegateSession().getUserPrincipal();
checkNativeSessionInitialized();
return getNativeSession().getUserPrincipal();
}
@Override
@ -101,29 +101,29 @@ public class StandardWebSocketSession extends AbstractWebSocketSesssion<javax.we
@Override
public String getAcceptedProtocol() {
checkDelegateSessionInitialized();
String protocol = getDelegateSession().getNegotiatedSubprotocol();
checkNativeSessionInitialized();
String protocol = getNativeSession().getNegotiatedSubprotocol();
return StringUtils.isEmpty(protocol)? null : protocol;
}
@Override
public boolean isOpen() {
return ((getDelegateSession() != null) && getDelegateSession().isOpen());
return ((getNativeSession() != null) && getNativeSession().isOpen());
}
@Override
protected void sendTextMessage(TextMessage message) throws IOException {
getDelegateSession().getBasicRemote().sendText(message.getPayload(), message.isLast());
getNativeSession().getBasicRemote().sendText(message.getPayload(), message.isLast());
}
@Override
protected void sendBinaryMessage(BinaryMessage message) throws IOException {
getDelegateSession().getBasicRemote().sendBinary(message.getPayload(), message.isLast());
getNativeSession().getBasicRemote().sendBinary(message.getPayload(), message.isLast());
}
@Override
protected void closeInternal(CloseStatus status) throws IOException {
getDelegateSession().close(new CloseReason(CloseCodes.getCloseCode(status.getCode()), status.getReason()));
getNativeSession().close(new CloseReason(CloseCodes.getCloseCode(status.getCode()), status.getReason()));
}
}

View File

@ -52,10 +52,20 @@ public class StandardWebSocketClient extends AbstractWebSocketClient {
private final WebSocketContainer webSocketContainer;
/**
* Default constructor that calls {@code ContainerProvider.getWebSocketContainer()} to
* obtain a {@link WebSocketContainer} instance.
*/
public StandardWebSocketClient() {
this.webSocketContainer = ContainerProvider.getWebSocketContainer();
}
/**
* Constructor that accepts a pre-configured {@link WebSocketContainer} instance. If
* using XML configuration see {@link WebSocketContainerFactoryBean}. In Java
* configuration use {@code ContainerProvider.getWebSocketContainer()} to obtain
* a container instance.
*/
public StandardWebSocketClient(WebSocketContainer webSocketContainer) {
Assert.notNull(webSocketContainer, "webSocketContainer must not be null");
this.webSocketContainer = webSocketContainer;
@ -63,9 +73,9 @@ public class StandardWebSocketClient extends AbstractWebSocketClient {
@Override
protected WebSocketSession doHandshakeInternal(WebSocketHandler webSocketHandler, HttpHeaders headers,
URI uri, List<String> protocols, Map<String, Object> handshakeAttributes)
throws WebSocketConnectFailureException {
protected WebSocketSession doHandshakeInternal(WebSocketHandler webSocketHandler,
HttpHeaders headers, URI uri, List<String> protocols,
Map<String, Object> handshakeAttributes) throws WebSocketConnectFailureException {
int port = getPort(uri);
InetSocketAddress localAddress = new InetSocketAddress(getLocalHost(), port);

View File

@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.springframework.context.SmartLifecycle;
import org.springframework.http.HttpHeaders;
import org.springframework.web.socket.WebSocketHandler;
@ -51,12 +52,22 @@ public class JettyWebSocketClient extends AbstractWebSocketClient implements Sma
private final Object lifecycleMonitor = new Object();
/**
* Default constructor that creates an instance of
* {@link org.eclipse.jetty.websocket.client.WebSocketClient} with default settings.
*/
public JettyWebSocketClient() {
this.client = new org.eclipse.jetty.websocket.client.WebSocketClient();
}
/**
* Constructor that accepts a pre-configured {@link WebSocketClient}.
*/
public JettyWebSocketClient(WebSocketClient client) {
super();
this.client = client;
}
// TODO: configure Jetty WebSocketClient properties
public void setAutoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;

View File

@ -50,10 +50,15 @@ import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.socket.server.HandshakeFailureException;
import org.springframework.web.socket.server.endpoint.ServerEndpointRegistration;
import org.springframework.web.socket.server.endpoint.ServletServerContainerFactoryBean;
/**
* GlassFish support for upgrading a request during a WebSocket handshake.
* GlassFish support for upgrading a request during a WebSocket handshake. To modify
* properties of the underlying {@link javax.websocket.server.ServerContainer} you can use
* {@link ServletServerContainerFactoryBean} in XML configuration or if using Java
* configuration, access the container instance through the
* "javax.websocket.server.ServerContainer" ServletContext attribute.
*
* @author Rossen Stoyanchev
* @since 4.0

View File

@ -24,6 +24,7 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.server.HandshakeRFC6455;
import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
@ -55,10 +56,19 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy {
/**
* Default constructor.
* Default constructor that creates {@link WebSocketServerFactory} through its default
* constructor thus using a default {@link WebSocketPolicy}.
*/
public JettyRequestUpgradeStrategy() {
this.factory = new WebSocketServerFactory();
this(new WebSocketServerFactory());
}
/**
* A constructor accepting a {@link WebSocketServerFactory}. This may be useful for
* modifying the factory's {@link WebSocketPolicy} via
* {@link WebSocketServerFactory#getPolicy()}.
*/
public JettyRequestUpgradeStrategy(WebSocketServerFactory factory) {
this.factory.setCreator(new WebSocketCreator() {
@Override
public Object createWebSocket(UpgradeRequest request, UpgradeResponse response) {

View File

@ -40,9 +40,15 @@ import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.socket.server.HandshakeFailureException;
import org.springframework.web.socket.server.endpoint.ServerEndpointRegistration;
import org.springframework.web.socket.server.endpoint.ServletServerContainerFactoryBean;
/**
* Tomcat support for upgrading an {@link HttpServletRequest} during a WebSocket handshake.
* Tomcat support for upgrading an {@link HttpServletRequest} during a WebSocket
* handshake. To modify properties of the underlying
* {@link javax.websocket.server.ServerContainer} you can use
* {@link ServletServerContainerFactoryBean} in XML configuration or if using Java
* configuration, access the container instance through the
* "javax.websocket.server.ServerContainer" ServletContext attribute.
*
* @author Rossen Stoyanchev
* @since 4.0
@ -96,7 +102,7 @@ public class TomcatRequestUpgradeStrategy extends AbstractStandardUpgradeStrateg
}
}
private WsServerContainer getContainer(HttpServletRequest servletRequest) {
public WsServerContainer getContainer(HttpServletRequest servletRequest) {
String attribute = "javax.websocket.server.ServerContainer";
ServletContext servletContext = servletRequest.getServletContext();
return (WsServerContainer) servletContext.getAttribute(attribute);

View File

@ -83,6 +83,8 @@ public abstract class AbstractSockJsService implements SockJsService {
private long disconnectDelay = 5 * 1000;
private int httpMessageCacheSize = 100;
private boolean webSocketsEnabled = true;
private final TaskScheduler taskScheduler;
@ -244,6 +246,28 @@ public abstract class AbstractSockJsService implements SockJsService {
return this.disconnectDelay;
}
/**
* The number of server-to-client messages that a session can cache while waiting for
* the next HTTP polling request from the client. All HTTP transports use this
* property since even streaming transports recycle HTTP requests periodically.
* <p>
* The amount of time between HTTP requests should be relatively brief and will not
* exceed the allows disconnect delay (see
* {@link #setDisconnectDelay(long)}), 5 seconds by default.
* <p>
* The default size is 100.
*/
public void setHttpMessageCacheSize(int httpMessageCacheSize) {
this.httpMessageCacheSize = httpMessageCacheSize;
}
/**
* Return the size of the HTTP message cache.
*/
public int getHttpMessageCacheSize() {
return this.httpMessageCacheSize;
}
/**
* Some load balancers don't support websockets. This option can be used to
* disable the WebSocket transport on the server side.

View File

@ -425,6 +425,11 @@ public class DefaultSockJsService extends AbstractSockJsService {
+ " Either add Jackson 2 or Jackson 1.x to the classpath, or configure a SockJsMessageCode");
return DefaultSockJsService.this.getMessageCodec();
}
@Override
public int getHttpMessageCacheSize() {
return DefaultSockJsService.this.getHttpMessageCacheSize();
}
};
}

View File

@ -69,7 +69,7 @@ public class SockJsWebSocketHandler extends TextWebSocketHandlerAdapter {
@Override
public void afterConnectionEstablished(WebSocketSession wsSession) throws Exception {
Assert.isTrue(this.sessionCount.compareAndSet(0, 1), "Unexpected connection");
this.sockJsSession.afterSessionInitialized(wsSession);
this.sockJsSession.initializeDelegateSession(wsSession);
}
@Override

View File

@ -46,7 +46,7 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
private FrameFormat frameFormat;
private final BlockingQueue<String> messageCache = new ArrayBlockingQueue<String>(100);
private final BlockingQueue<String> messageCache;
private ServerHttpRequest request;
@ -71,6 +71,7 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
WebSocketHandler wsHandler, Map<String, Object> handshakeAttributes) {
super(id, config, wsHandler, handshakeAttributes);
this.messageCache = new ArrayBlockingQueue<String>(config.getHttpMessageCacheSize());
}

View File

@ -18,6 +18,7 @@ package org.springframework.web.socket.sockjs.transport.session;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.web.socket.sockjs.SockJsService;
import org.springframework.web.socket.sockjs.support.AbstractSockJsService;
import org.springframework.web.socket.sockjs.support.frame.SockJsMessageCodec;
/**
@ -29,12 +30,50 @@ import org.springframework.web.socket.sockjs.support.frame.SockJsMessageCodec;
*/
public interface SockJsServiceConfig {
/**
* Streaming transports save responses on the client side and don't free
* memory used by delivered messages. Such transports need to recycle the
* connection once in a while. This property sets a minimum number of bytes
* that can be send over a single HTTP streaming request before it will be
* closed. After that client will open a new request. Setting this value to
* one effectively disables streaming and will make streaming transports to
* behave like polling transports.
*
* <p>The default value is 128K (i.e. 128 * 1024).
*/
int getStreamBytesLimit();
/**
* The amount of time in milliseconds when the server has not sent any
* messages and after which the server should send a heartbeat frame to the
* client in order to keep the connection from breaking.
*
* <p>The default value is 25,000 (25 seconds).
*/
long getHeartbeatTime();
/**
* A scheduler instance to use for scheduling heart-beat messages.
*/
TaskScheduler getTaskScheduler();
/**
* The codec to use for encoding and decoding SockJS messages.
* @exception IllegalStateException if no {@link SockJsMessageCodec} is available
*/
SockJsMessageCodec getMessageCodec();
/**
* The number of server-to-client messages that a session can cache while waiting for
* the next HTTP polling request from the client. All HTTP transports use this
* property since even streaming transports recycle HTTP requests periodically.
* <p>
* The amount of time between HTTP requests should be relatively brief and will not
* exceed the allows disconnect delay (see
* {@link AbstractSockJsService#setDisconnectDelay(long)}, 5 seconds by default.
* <p>
* The default size is 100.
*/
int getHttpMessageCacheSize();
}

View File

@ -29,7 +29,7 @@ import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.adapter.DelegatingWebSocketSession;
import org.springframework.web.socket.adapter.NativeWebSocketSession;
import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
import org.springframework.web.socket.sockjs.support.frame.SockJsFrame;
import org.springframework.web.socket.sockjs.support.frame.SockJsMessageCodec;
@ -41,7 +41,7 @@ import org.springframework.web.socket.sockjs.support.frame.SockJsMessageCodec;
* @since 4.0
*/
public class WebSocketServerSockJsSession extends AbstractSockJsSession
implements DelegatingWebSocketSession<WebSocketSession> {
implements WebSocketSession, NativeWebSocketSession {
private WebSocketSession wsSession;
@ -93,9 +93,24 @@ public class WebSocketServerSockJsSession extends AbstractSockJsSession
Assert.state(this.wsSession != null, "WebSocketSession not yet initialized");
}
@Override
public Object getNativeSession() {
if ((this.wsSession != null) && (this.wsSession instanceof NativeWebSocketSession)) {
return ((NativeWebSocketSession) this.wsSession).getNativeSession();
}
return null;
}
@Override
public void afterSessionInitialized(WebSocketSession session) {
public <T> T getNativeSession(Class<T> requiredType) {
if ((this.wsSession != null) && (this.wsSession instanceof NativeWebSocketSession)) {
return ((NativeWebSocketSession) this.wsSession).getNativeSession(requiredType);
}
return null;
}
public void initializeDelegateSession(WebSocketSession session) {
this.wsSession = session;
try {
TextMessage message = new TextMessage(SockJsFrame.openFrame().getContent());

View File

@ -34,6 +34,8 @@ public class StubSockJsServiceConfig implements SockJsServiceConfig {
private SockJsMessageCodec messageCodec = new Jackson2SockJsMessageCodec();
private int httpMessageCacheSize = 100;
@Override
public int getStreamBytesLimit() {
@ -71,4 +73,12 @@ public class StubSockJsServiceConfig implements SockJsServiceConfig {
this.messageCodec = messageCodec;
}
public int getHttpMessageCacheSize() {
return this.httpMessageCacheSize;
}
public void setHttpMessageCacheSize(int httpMessageCacheSize) {
this.httpMessageCacheSize = httpMessageCacheSize;
}
}

View File

@ -62,7 +62,7 @@ public class WebSocketServerSockJsSessionTests extends BaseAbstractSockJsSession
public void isActive() throws Exception {
assertFalse(this.session.isActive());
this.session.afterSessionInitialized(this.webSocketSession);
this.session.initializeDelegateSession(this.webSocketSession);
assertTrue(this.session.isActive());
this.webSocketSession.setOpen(false);
@ -72,7 +72,7 @@ public class WebSocketServerSockJsSessionTests extends BaseAbstractSockJsSession
@Test
public void afterSessionInitialized() throws Exception {
this.session.afterSessionInitialized(this.webSocketSession);
this.session.initializeDelegateSession(this.webSocketSession);
assertEquals("Open frame not sent",
Collections.singletonList(new TextMessage("o")), this.webSocketSession.getSentMessages());
@ -111,7 +111,7 @@ public class WebSocketServerSockJsSessionTests extends BaseAbstractSockJsSession
@Test
public void sendMessageInternal() throws Exception {
this.session.afterSessionInitialized(this.webSocketSession);
this.session.initializeDelegateSession(this.webSocketSession);
this.session.sendMessageInternal("x");
assertEquals(Arrays.asList(new TextMessage("o"), new TextMessage("a[\"x\"]")),
@ -123,7 +123,7 @@ public class WebSocketServerSockJsSessionTests extends BaseAbstractSockJsSession
@Test
public void disconnect() throws Exception {
this.session.afterSessionInitialized(this.webSocketSession);
this.session.initializeDelegateSession(this.webSocketSession);
this.session.close(CloseStatus.NOT_ACCEPTABLE);
assertEquals(CloseStatus.NOT_ACCEPTABLE, this.webSocketSession.getCloseStatus());