From c68b4c01e1afb9c225241f81a8e13a0f1c5c210e Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 30 Jul 2014 15:29:46 -0400 Subject: [PATCH] Polish SockJsClient --- .../sockjs/client/AbstractXhrTransport.java | 24 +++- .../socket/sockjs/client/SockJsClient.java | 107 ++++++++++-------- .../web/socket/sockjs/client/Transport.java | 10 ++ .../sockjs/client/WebSocketTransport.java | 8 ++ .../AbstractSockJsIntegrationTests.java | 2 +- .../socket/sockjs/client/TestTransport.java | 15 +++ 6 files changed, 113 insertions(+), 53 deletions(-) diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractXhrTransport.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractXhrTransport.java index 44ee209102b..ac9a2874fb4 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractXhrTransport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractXhrTransport.java @@ -29,8 +29,11 @@ import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.sockjs.frame.SockJsFrame; +import org.springframework.web.socket.sockjs.transport.TransportType; import java.net.URI; +import java.util.Arrays; +import java.util.List; /** * Abstract base class for XHR transport implementations to extend. @@ -59,17 +62,32 @@ public abstract class AbstractXhrTransport implements XhrTransport { private HttpHeaders xhrSendRequestHeaders = new HttpHeaders(); + @Override + public List getTransportTypes() { + return (isXhrStreamingDisabled() ? + Arrays.asList(TransportType.XHR) : + Arrays.asList(TransportType.XHR_STREAMING, TransportType.XHR)); + } + /** - * Whether to attempt to connect with "xhr_streaming" first before trying - * with "xhr" next, see {@link XhrTransport#isXhrStreamingDisabled()}. + * An {@code XhrTransport} can support both the "xhr_streaming" and "xhr" + * SockJS server transports. From a client perspective there is no + * implementation difference. + * + *

Typically an {@code XhrTransport} is used as "XHR streaming" first and + * then, if that fails, as "XHR". In some cases however it may be helpful to + * suppress XHR streaming so that only XHR is attempted. * *

By default this property is set to {@code false} which means both - * "xhr_streaming" and "xhr" will be tried. + * "XHR streaming" and "XHR" apply. */ public void setXhrStreamingDisabled(boolean disabled) { this.xhrStreamingDisabled = disabled; } + /** + * Whether XHR streaming is disabled or not. + */ public boolean isXhrStreamingDisabled() { return this.xhrStreamingDisabled; } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/SockJsClient.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/SockJsClient.java index 7b0c3357651..156de8e4984 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/SockJsClient.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/SockJsClient.java @@ -43,7 +43,11 @@ import java.util.concurrent.ConcurrentHashMap; /** * A SockJS implementation of * {@link org.springframework.web.socket.client.WebSocketClient WebSocketClient} - * with HTTP-based fallback alternative simulating a WebSocket interaction. + * with fallback alternatives that simulate a WebSocket interaction through plain + * HTTP streaming and long polling techniques.. + * + *

Implements {@link Lifecycle} in order to propagate lifecycle events to + * the transports it is configured with. * * @author Rossen Stoyanchev * @since 4.1 @@ -59,27 +63,33 @@ public class SockJsClient extends AbstractWebSocketClient implements Lifecycle { private static final Log logger = LogFactory.getLog(SockJsClient.class); - private final List transports; - private InfoReceiver infoReceiver; + private final List transports; + private SockJsMessageCodec messageCodec; - private TaskScheduler taskScheduler; + private TaskScheduler connectTimeoutScheduler; - private final Map infoCache = new ConcurrentHashMap(); + private final Map serverInfoCache = new ConcurrentHashMap(); private volatile boolean running = false; /** * Create a {@code SockJsClient} with the given transports. - * @param transports the transports to use + * + *

If the list includes an {@link XhrTransport} (or more specifically an + * implementation of {@link InfoReceiver}) the instance is used to initialize + * the {@link #setInfoReceiver(InfoReceiver) infoReceiver} property, or + * otherwise is defaulted to {@link RestTemplateXhrTransport}. + * + * @param transports the (non-empty) list of transports to use */ public SockJsClient(List transports) { Assert.notEmpty(transports, "No transports provided"); - this.transports = new ArrayList(transports); this.infoReceiver = initInfoReceiver(transports); + this.transports = new ArrayList(transports); if (jackson2Present) { this.messageCodec = new Jackson2SockJsMessageCodec(); } @@ -99,16 +109,21 @@ public class SockJsClient extends AbstractWebSocketClient implements Lifecycle { * Configure the {@code InfoReceiver} to use to perform the SockJS "Info" * request before the SockJS session starts. * - *

By default this is initialized either by looking through the configured - * transports to find the first {@code XhrTransport} or by creating an - * instance of {@code RestTemplateXhrTransport}. + *

If the list of transports provided to the constructor contained an + * {@link XhrTransport} or an implementation of {@link InfoReceiver} that + * instance would have been used to initialize this property, or otherwise + * it defaults to {@link RestTemplateXhrTransport}. * * @param infoReceiver the transport to use for the SockJS "Info" request */ public void setInfoReceiver(InfoReceiver infoReceiver) { + Assert.notNull(infoReceiver, "'infoReceiver' is required"); this.infoReceiver = infoReceiver; } + /** + * Return the configured {@code InfoReceiver}, never {@code null}. + */ public InfoReceiver getInfoReceiver() { return this.infoReceiver; } @@ -133,22 +148,19 @@ public class SockJsClient extends AbstractWebSocketClient implements Lifecycle { /** * Configure a {@code TaskScheduler} for scheduling a connect timeout task * where the timeout value is calculated based on the duration of the initial - * SockJS info request. Having a connect timeout task is optional but can - * improve the speed with which the client falls back to alternative - * transport options. - * - *

By default no task scheduler is configured in which case it may take - * longer before a fallback transport can be used. - * - * @param taskScheduler the scheduler to use + * SockJS "Info" request. The connect timeout task ensures a more timely + * fallback but is otherwise entirely optional. + *

By default this is not configured in which case a fallback may take longer. + * @param connectTimeoutScheduler the task scheduler to use */ - public void setTaskScheduler(TaskScheduler taskScheduler) { - this.taskScheduler = taskScheduler; + public void setConnectTimeoutScheduler(TaskScheduler connectTimeoutScheduler) { + this.connectTimeoutScheduler = connectTimeoutScheduler; } @Override public void start() { if (!isRunning()) { + this.running = true; for (Transport transport : this.transports) { if (transport instanceof Lifecycle) { if (!((Lifecycle) transport).isRunning()) { @@ -162,6 +174,7 @@ public class SockJsClient extends AbstractWebSocketClient implements Lifecycle { @Override public void stop() { if (!isRunning()) { + this.running = false; for (Transport transport : this.transports) { if (transport instanceof Lifecycle) { if (((Lifecycle) transport).isRunning()) { @@ -177,8 +190,14 @@ public class SockJsClient extends AbstractWebSocketClient implements Lifecycle { return this.running; } + /** + * By default the result of a SockJS "Info" request, including whether the + * server has WebSocket disabled and how long the request took (used for + * calculating transport timeout time) is cached. This method can be used to + * clear that cache hence causing it to re-populate. + */ public void clearServerInfoCache() { - this.infoCache.clear(); + this.serverInfoCache.clear(); } @Override @@ -198,7 +217,7 @@ public class SockJsClient extends AbstractWebSocketClient implements Lifecycle { try { SockJsUrlInfo sockJsUrlInfo = new SockJsUrlInfo(url); ServerInfo serverInfo = getServerInfo(sockJsUrlInfo); - createFallbackChain(sockJsUrlInfo, handshakeHeaders, serverInfo).connect(handler, connectFuture); + createRequest(sockJsUrlInfo, handshakeHeaders, serverInfo).connect(handler, connectFuture); } catch (Throwable exception) { if (logger.isErrorEnabled()) { @@ -211,52 +230,39 @@ public class SockJsClient extends AbstractWebSocketClient implements Lifecycle { private ServerInfo getServerInfo(SockJsUrlInfo sockJsUrlInfo) { URI infoUrl = sockJsUrlInfo.getInfoUrl(); - ServerInfo info = this.infoCache.get(infoUrl); + ServerInfo info = this.serverInfoCache.get(infoUrl); if (info == null) { long start = System.currentTimeMillis(); String response = this.infoReceiver.executeInfoRequest(infoUrl); long infoRequestTime = System.currentTimeMillis() - start; info = new ServerInfo(response, infoRequestTime); - this.infoCache.put(infoUrl, info); + this.serverInfoCache.put(infoUrl, info); } return info; } - private DefaultTransportRequest createFallbackChain(SockJsUrlInfo urlInfo, HttpHeaders headers, ServerInfo serverInfo) { + private DefaultTransportRequest createRequest(SockJsUrlInfo urlInfo, HttpHeaders headers, ServerInfo serverInfo) { List requests = new ArrayList(this.transports.size()); for (Transport transport : this.transports) { - if (transport instanceof XhrTransport) { - XhrTransport xhrTransport = (XhrTransport) transport; - if (!xhrTransport.isXhrStreamingDisabled()) { - addRequest(requests, urlInfo, headers, serverInfo, transport, TransportType.XHR_STREAMING); + for (TransportType type : transport.getTransportTypes()) { + if (serverInfo.isWebSocketEnabled() || !TransportType.WEBSOCKET.equals(type)) { + requests.add(new DefaultTransportRequest(urlInfo, headers, transport, type, getMessageCodec())); } - addRequest(requests, urlInfo, headers, serverInfo, transport, TransportType.XHR); - } - else if (serverInfo.isWebSocketEnabled()) { - addRequest(requests, urlInfo, headers, serverInfo, transport, TransportType.WEBSOCKET); } } - Assert.notEmpty(requests, - "0 transports for request to " + urlInfo + " . Configured transports: " + - this.transports + ". SockJS server webSocketEnabled=" + serverInfo.isWebSocketEnabled()); + Assert.notEmpty(requests, "No transports, " + urlInfo + ", wsEnabled=" + serverInfo.isWebSocketEnabled()); for (int i = 0; i < requests.size() - 1; i++) { - requests.get(i).setFallbackRequest(requests.get(i + 1)); + DefaultTransportRequest request = requests.get(i); + request.setUser(getUser()); + if (this.connectTimeoutScheduler != null) { + request.setTimeoutValue(serverInfo.getRetransmissionTimeout()); + request.setTimeoutScheduler(this.connectTimeoutScheduler); + } + request.setFallbackRequest(requests.get(i + 1)); } return requests.get(0); } - private void addRequest(List requests, SockJsUrlInfo info, HttpHeaders headers, - ServerInfo serverInfo, Transport transport, TransportType type) { - - DefaultTransportRequest request = new DefaultTransportRequest(info, headers, transport, type, getMessageCodec()); - request.setUser(getUser()); - if (this.taskScheduler != null) { - request.setTimeoutValue(serverInfo.getRetransmissionTimeout()); - request.setTimeoutScheduler(this.taskScheduler); - } - requests.add(request); - } - /** * Return the user to associate with the SockJS session and make available via * {@link org.springframework.web.socket.WebSocketSession#getPrincipal() @@ -269,6 +275,9 @@ public class SockJsClient extends AbstractWebSocketClient implements Lifecycle { } + /** + * A simple value object holding the result from a SockJS "Info" request. + */ private static class ServerInfo { private final boolean webSocketEnabled; diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/Transport.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/Transport.java index 41d554a5ef6..6d8c7d9490b 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/Transport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/Transport.java @@ -19,6 +19,9 @@ package org.springframework.web.socket.sockjs.client; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.sockjs.transport.TransportType; + +import java.util.List; /** * A client-side implementation for a SockJS transport. @@ -28,6 +31,13 @@ import org.springframework.web.socket.WebSocketSession; */ public interface Transport { + /** + * Return the SockJS transport types that this transport can be used for. + * In particular since from a client perspective there is no difference + * between XHR and XHR streaming, an {@code XhrTransport} could do both. + */ + List getTransportTypes(); + /** * Connect the transport. * diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/WebSocketTransport.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/WebSocketTransport.java index ef10ed8e126..474a0f628d4 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/WebSocketTransport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/WebSocketTransport.java @@ -30,8 +30,11 @@ import org.springframework.web.socket.WebSocketHttpHeaders; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.client.WebSocketClient; import org.springframework.web.socket.handler.TextWebSocketHandler; +import org.springframework.web.socket.sockjs.transport.TransportType; import java.net.URI; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; /** @@ -56,6 +59,11 @@ public class WebSocketTransport implements Transport, Lifecycle { } + @Override + public List getTransportTypes() { + return Arrays.asList(TransportType.WEBSOCKET); + } + /** * Return the configured {@code WebSocketClient}. */ diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/client/AbstractSockJsIntegrationTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/client/AbstractSockJsIntegrationTests.java index 3de6e6b009a..d40539ba486 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/client/AbstractSockJsIntegrationTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/client/AbstractSockJsIntegrationTests.java @@ -218,7 +218,7 @@ public abstract class AbstractSockJsIntegrationTests { this.errorFilter.sleepDelayMap.put("/xhr_streaming", 10000L); this.errorFilter.responseStatusMap.put("/xhr_streaming", 503); initSockJsClient(createXhrTransport()); - this.sockJsClient.setTaskScheduler(this.wac.getBean(ThreadPoolTaskScheduler.class)); + this.sockJsClient.setConnectTimeoutScheduler(this.wac.getBean(ThreadPoolTaskScheduler.class)); WebSocketSession clientSession = sockJsClient.doHandshake(clientHandler, this.baseUrl + "/echo").get(); assertEquals("Fallback didn't occur", XhrClientSockJsSession.class, clientSession.getClass()); TextMessage message = new TextMessage("message1"); diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/client/TestTransport.java b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/client/TestTransport.java index f54083b2cec..bea84d99a8e 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/client/TestTransport.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/client/TestTransport.java @@ -22,8 +22,11 @@ import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.sockjs.transport.TransportType; import java.net.URI; +import java.util.Arrays; +import java.util.List; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -46,6 +49,11 @@ class TestTransport implements Transport { this.name = name; } + @Override + public List getTransportTypes() { + return Arrays.asList(TransportType.WEBSOCKET); + } + public TransportRequest getRequest() { return this.request; } @@ -84,6 +92,13 @@ class TestTransport implements Transport { super(name); } + @Override + public List getTransportTypes() { + return (isXhrStreamingDisabled() ? + Arrays.asList(TransportType.XHR) : + Arrays.asList(TransportType.XHR_STREAMING, TransportType.XHR)); + } + public void setStreamingDisabled(boolean streamingDisabled) { this.streamingDisabled = streamingDisabled; }