Polish SockJsClient
This commit is contained in:
parent
ba952ca331
commit
c68b4c01e1
|
|
@ -29,8 +29,11 @@ import org.springframework.web.socket.TextMessage;
|
|||
import org.springframework.web.socket.WebSocketHandler;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
import org.springframework.web.socket.sockjs.frame.SockJsFrame;
|
||||
import org.springframework.web.socket.sockjs.transport.TransportType;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Abstract base class for XHR transport implementations to extend.
|
||||
|
|
@ -59,17 +62,32 @@ public abstract class AbstractXhrTransport implements XhrTransport {
|
|||
private HttpHeaders xhrSendRequestHeaders = new HttpHeaders();
|
||||
|
||||
|
||||
@Override
|
||||
public List<TransportType> getTransportTypes() {
|
||||
return (isXhrStreamingDisabled() ?
|
||||
Arrays.asList(TransportType.XHR) :
|
||||
Arrays.asList(TransportType.XHR_STREAMING, TransportType.XHR));
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether to attempt to connect with "xhr_streaming" first before trying
|
||||
* with "xhr" next, see {@link XhrTransport#isXhrStreamingDisabled()}.
|
||||
* An {@code XhrTransport} can support both the "xhr_streaming" and "xhr"
|
||||
* SockJS server transports. From a client perspective there is no
|
||||
* implementation difference.
|
||||
*
|
||||
* <p>Typically an {@code XhrTransport} is used as "XHR streaming" first and
|
||||
* then, if that fails, as "XHR". In some cases however it may be helpful to
|
||||
* suppress XHR streaming so that only XHR is attempted.
|
||||
*
|
||||
* <p>By default this property is set to {@code false} which means both
|
||||
* "xhr_streaming" and "xhr" will be tried.
|
||||
* "XHR streaming" and "XHR" apply.
|
||||
*/
|
||||
public void setXhrStreamingDisabled(boolean disabled) {
|
||||
this.xhrStreamingDisabled = disabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether XHR streaming is disabled or not.
|
||||
*/
|
||||
public boolean isXhrStreamingDisabled() {
|
||||
return this.xhrStreamingDisabled;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,7 +43,11 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
/**
|
||||
* A SockJS implementation of
|
||||
* {@link org.springframework.web.socket.client.WebSocketClient WebSocketClient}
|
||||
* with HTTP-based fallback alternative simulating a WebSocket interaction.
|
||||
* with fallback alternatives that simulate a WebSocket interaction through plain
|
||||
* HTTP streaming and long polling techniques..
|
||||
*
|
||||
* <p>Implements {@link Lifecycle} in order to propagate lifecycle events to
|
||||
* the transports it is configured with.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 4.1
|
||||
|
|
@ -59,27 +63,33 @@ public class SockJsClient extends AbstractWebSocketClient implements Lifecycle {
|
|||
private static final Log logger = LogFactory.getLog(SockJsClient.class);
|
||||
|
||||
|
||||
private final List<Transport> transports;
|
||||
|
||||
private InfoReceiver infoReceiver;
|
||||
|
||||
private final List<Transport> transports;
|
||||
|
||||
private SockJsMessageCodec messageCodec;
|
||||
|
||||
private TaskScheduler taskScheduler;
|
||||
private TaskScheduler connectTimeoutScheduler;
|
||||
|
||||
private final Map<URI, ServerInfo> infoCache = new ConcurrentHashMap<URI, ServerInfo>();
|
||||
private final Map<URI, ServerInfo> serverInfoCache = new ConcurrentHashMap<URI, ServerInfo>();
|
||||
|
||||
private volatile boolean running = false;
|
||||
|
||||
|
||||
/**
|
||||
* Create a {@code SockJsClient} with the given transports.
|
||||
* @param transports the transports to use
|
||||
*
|
||||
* <p>If the list includes an {@link XhrTransport} (or more specifically an
|
||||
* implementation of {@link InfoReceiver}) the instance is used to initialize
|
||||
* the {@link #setInfoReceiver(InfoReceiver) infoReceiver} property, or
|
||||
* otherwise is defaulted to {@link RestTemplateXhrTransport}.
|
||||
*
|
||||
* @param transports the (non-empty) list of transports to use
|
||||
*/
|
||||
public SockJsClient(List<Transport> transports) {
|
||||
Assert.notEmpty(transports, "No transports provided");
|
||||
this.transports = new ArrayList<Transport>(transports);
|
||||
this.infoReceiver = initInfoReceiver(transports);
|
||||
this.transports = new ArrayList<Transport>(transports);
|
||||
if (jackson2Present) {
|
||||
this.messageCodec = new Jackson2SockJsMessageCodec();
|
||||
}
|
||||
|
|
@ -99,16 +109,21 @@ public class SockJsClient extends AbstractWebSocketClient implements Lifecycle {
|
|||
* Configure the {@code InfoReceiver} to use to perform the SockJS "Info"
|
||||
* request before the SockJS session starts.
|
||||
*
|
||||
* <p>By default this is initialized either by looking through the configured
|
||||
* transports to find the first {@code XhrTransport} or by creating an
|
||||
* instance of {@code RestTemplateXhrTransport}.
|
||||
* <p>If the list of transports provided to the constructor contained an
|
||||
* {@link XhrTransport} or an implementation of {@link InfoReceiver} that
|
||||
* instance would have been used to initialize this property, or otherwise
|
||||
* it defaults to {@link RestTemplateXhrTransport}.
|
||||
*
|
||||
* @param infoReceiver the transport to use for the SockJS "Info" request
|
||||
*/
|
||||
public void setInfoReceiver(InfoReceiver infoReceiver) {
|
||||
Assert.notNull(infoReceiver, "'infoReceiver' is required");
|
||||
this.infoReceiver = infoReceiver;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the configured {@code InfoReceiver}, never {@code null}.
|
||||
*/
|
||||
public InfoReceiver getInfoReceiver() {
|
||||
return this.infoReceiver;
|
||||
}
|
||||
|
|
@ -133,22 +148,19 @@ public class SockJsClient extends AbstractWebSocketClient implements Lifecycle {
|
|||
/**
|
||||
* Configure a {@code TaskScheduler} for scheduling a connect timeout task
|
||||
* where the timeout value is calculated based on the duration of the initial
|
||||
* SockJS info request. Having a connect timeout task is optional but can
|
||||
* improve the speed with which the client falls back to alternative
|
||||
* transport options.
|
||||
*
|
||||
* <p>By default no task scheduler is configured in which case it may take
|
||||
* longer before a fallback transport can be used.
|
||||
*
|
||||
* @param taskScheduler the scheduler to use
|
||||
* SockJS "Info" request. The connect timeout task ensures a more timely
|
||||
* fallback but is otherwise entirely optional.
|
||||
* <p>By default this is not configured in which case a fallback may take longer.
|
||||
* @param connectTimeoutScheduler the task scheduler to use
|
||||
*/
|
||||
public void setTaskScheduler(TaskScheduler taskScheduler) {
|
||||
this.taskScheduler = taskScheduler;
|
||||
public void setConnectTimeoutScheduler(TaskScheduler connectTimeoutScheduler) {
|
||||
this.connectTimeoutScheduler = connectTimeoutScheduler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
if (!isRunning()) {
|
||||
this.running = true;
|
||||
for (Transport transport : this.transports) {
|
||||
if (transport instanceof Lifecycle) {
|
||||
if (!((Lifecycle) transport).isRunning()) {
|
||||
|
|
@ -162,6 +174,7 @@ public class SockJsClient extends AbstractWebSocketClient implements Lifecycle {
|
|||
@Override
|
||||
public void stop() {
|
||||
if (!isRunning()) {
|
||||
this.running = false;
|
||||
for (Transport transport : this.transports) {
|
||||
if (transport instanceof Lifecycle) {
|
||||
if (((Lifecycle) transport).isRunning()) {
|
||||
|
|
@ -177,8 +190,14 @@ public class SockJsClient extends AbstractWebSocketClient implements Lifecycle {
|
|||
return this.running;
|
||||
}
|
||||
|
||||
/**
|
||||
* By default the result of a SockJS "Info" request, including whether the
|
||||
* server has WebSocket disabled and how long the request took (used for
|
||||
* calculating transport timeout time) is cached. This method can be used to
|
||||
* clear that cache hence causing it to re-populate.
|
||||
*/
|
||||
public void clearServerInfoCache() {
|
||||
this.infoCache.clear();
|
||||
this.serverInfoCache.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -198,7 +217,7 @@ public class SockJsClient extends AbstractWebSocketClient implements Lifecycle {
|
|||
try {
|
||||
SockJsUrlInfo sockJsUrlInfo = new SockJsUrlInfo(url);
|
||||
ServerInfo serverInfo = getServerInfo(sockJsUrlInfo);
|
||||
createFallbackChain(sockJsUrlInfo, handshakeHeaders, serverInfo).connect(handler, connectFuture);
|
||||
createRequest(sockJsUrlInfo, handshakeHeaders, serverInfo).connect(handler, connectFuture);
|
||||
}
|
||||
catch (Throwable exception) {
|
||||
if (logger.isErrorEnabled()) {
|
||||
|
|
@ -211,52 +230,39 @@ public class SockJsClient extends AbstractWebSocketClient implements Lifecycle {
|
|||
|
||||
private ServerInfo getServerInfo(SockJsUrlInfo sockJsUrlInfo) {
|
||||
URI infoUrl = sockJsUrlInfo.getInfoUrl();
|
||||
ServerInfo info = this.infoCache.get(infoUrl);
|
||||
ServerInfo info = this.serverInfoCache.get(infoUrl);
|
||||
if (info == null) {
|
||||
long start = System.currentTimeMillis();
|
||||
String response = this.infoReceiver.executeInfoRequest(infoUrl);
|
||||
long infoRequestTime = System.currentTimeMillis() - start;
|
||||
info = new ServerInfo(response, infoRequestTime);
|
||||
this.infoCache.put(infoUrl, info);
|
||||
this.serverInfoCache.put(infoUrl, info);
|
||||
}
|
||||
return info;
|
||||
}
|
||||
|
||||
private DefaultTransportRequest createFallbackChain(SockJsUrlInfo urlInfo, HttpHeaders headers, ServerInfo serverInfo) {
|
||||
private DefaultTransportRequest createRequest(SockJsUrlInfo urlInfo, HttpHeaders headers, ServerInfo serverInfo) {
|
||||
List<DefaultTransportRequest> requests = new ArrayList<DefaultTransportRequest>(this.transports.size());
|
||||
for (Transport transport : this.transports) {
|
||||
if (transport instanceof XhrTransport) {
|
||||
XhrTransport xhrTransport = (XhrTransport) transport;
|
||||
if (!xhrTransport.isXhrStreamingDisabled()) {
|
||||
addRequest(requests, urlInfo, headers, serverInfo, transport, TransportType.XHR_STREAMING);
|
||||
for (TransportType type : transport.getTransportTypes()) {
|
||||
if (serverInfo.isWebSocketEnabled() || !TransportType.WEBSOCKET.equals(type)) {
|
||||
requests.add(new DefaultTransportRequest(urlInfo, headers, transport, type, getMessageCodec()));
|
||||
}
|
||||
addRequest(requests, urlInfo, headers, serverInfo, transport, TransportType.XHR);
|
||||
}
|
||||
else if (serverInfo.isWebSocketEnabled()) {
|
||||
addRequest(requests, urlInfo, headers, serverInfo, transport, TransportType.WEBSOCKET);
|
||||
}
|
||||
}
|
||||
Assert.notEmpty(requests,
|
||||
"0 transports for request to " + urlInfo + " . Configured transports: " +
|
||||
this.transports + ". SockJS server webSocketEnabled=" + serverInfo.isWebSocketEnabled());
|
||||
Assert.notEmpty(requests, "No transports, " + urlInfo + ", wsEnabled=" + serverInfo.isWebSocketEnabled());
|
||||
for (int i = 0; i < requests.size() - 1; i++) {
|
||||
requests.get(i).setFallbackRequest(requests.get(i + 1));
|
||||
DefaultTransportRequest request = requests.get(i);
|
||||
request.setUser(getUser());
|
||||
if (this.connectTimeoutScheduler != null) {
|
||||
request.setTimeoutValue(serverInfo.getRetransmissionTimeout());
|
||||
request.setTimeoutScheduler(this.connectTimeoutScheduler);
|
||||
}
|
||||
request.setFallbackRequest(requests.get(i + 1));
|
||||
}
|
||||
return requests.get(0);
|
||||
}
|
||||
|
||||
private void addRequest(List<DefaultTransportRequest> requests, SockJsUrlInfo info, HttpHeaders headers,
|
||||
ServerInfo serverInfo, Transport transport, TransportType type) {
|
||||
|
||||
DefaultTransportRequest request = new DefaultTransportRequest(info, headers, transport, type, getMessageCodec());
|
||||
request.setUser(getUser());
|
||||
if (this.taskScheduler != null) {
|
||||
request.setTimeoutValue(serverInfo.getRetransmissionTimeout());
|
||||
request.setTimeoutScheduler(this.taskScheduler);
|
||||
}
|
||||
requests.add(request);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the user to associate with the SockJS session and make available via
|
||||
* {@link org.springframework.web.socket.WebSocketSession#getPrincipal()
|
||||
|
|
@ -269,6 +275,9 @@ public class SockJsClient extends AbstractWebSocketClient implements Lifecycle {
|
|||
}
|
||||
|
||||
|
||||
/**
|
||||
* A simple value object holding the result from a SockJS "Info" request.
|
||||
*/
|
||||
private static class ServerInfo {
|
||||
|
||||
private final boolean webSocketEnabled;
|
||||
|
|
|
|||
|
|
@ -19,6 +19,9 @@ package org.springframework.web.socket.sockjs.client;
|
|||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
import org.springframework.web.socket.WebSocketHandler;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
import org.springframework.web.socket.sockjs.transport.TransportType;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A client-side implementation for a SockJS transport.
|
||||
|
|
@ -28,6 +31,13 @@ import org.springframework.web.socket.WebSocketSession;
|
|||
*/
|
||||
public interface Transport {
|
||||
|
||||
/**
|
||||
* Return the SockJS transport types that this transport can be used for.
|
||||
* In particular since from a client perspective there is no difference
|
||||
* between XHR and XHR streaming, an {@code XhrTransport} could do both.
|
||||
*/
|
||||
List<TransportType> getTransportTypes();
|
||||
|
||||
/**
|
||||
* Connect the transport.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -30,8 +30,11 @@ import org.springframework.web.socket.WebSocketHttpHeaders;
|
|||
import org.springframework.web.socket.WebSocketSession;
|
||||
import org.springframework.web.socket.client.WebSocketClient;
|
||||
import org.springframework.web.socket.handler.TextWebSocketHandler;
|
||||
import org.springframework.web.socket.sockjs.transport.TransportType;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
|
|
@ -56,6 +59,11 @@ public class WebSocketTransport implements Transport, Lifecycle {
|
|||
}
|
||||
|
||||
|
||||
@Override
|
||||
public List<TransportType> getTransportTypes() {
|
||||
return Arrays.asList(TransportType.WEBSOCKET);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the configured {@code WebSocketClient}.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -218,7 +218,7 @@ public abstract class AbstractSockJsIntegrationTests {
|
|||
this.errorFilter.sleepDelayMap.put("/xhr_streaming", 10000L);
|
||||
this.errorFilter.responseStatusMap.put("/xhr_streaming", 503);
|
||||
initSockJsClient(createXhrTransport());
|
||||
this.sockJsClient.setTaskScheduler(this.wac.getBean(ThreadPoolTaskScheduler.class));
|
||||
this.sockJsClient.setConnectTimeoutScheduler(this.wac.getBean(ThreadPoolTaskScheduler.class));
|
||||
WebSocketSession clientSession = sockJsClient.doHandshake(clientHandler, this.baseUrl + "/echo").get();
|
||||
assertEquals("Fallback didn't occur", XhrClientSockJsSession.class, clientSession.getClass());
|
||||
TextMessage message = new TextMessage("message1");
|
||||
|
|
|
|||
|
|
@ -22,8 +22,11 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
|
|||
import org.springframework.web.socket.TextMessage;
|
||||
import org.springframework.web.socket.WebSocketHandler;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
import org.springframework.web.socket.sockjs.transport.TransportType;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
|
@ -46,6 +49,11 @@ class TestTransport implements Transport {
|
|||
this.name = name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TransportType> getTransportTypes() {
|
||||
return Arrays.asList(TransportType.WEBSOCKET);
|
||||
}
|
||||
|
||||
public TransportRequest getRequest() {
|
||||
return this.request;
|
||||
}
|
||||
|
|
@ -84,6 +92,13 @@ class TestTransport implements Transport {
|
|||
super(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TransportType> getTransportTypes() {
|
||||
return (isXhrStreamingDisabled() ?
|
||||
Arrays.asList(TransportType.XHR) :
|
||||
Arrays.asList(TransportType.XHR_STREAMING, TransportType.XHR));
|
||||
}
|
||||
|
||||
public void setStreamingDisabled(boolean streamingDisabled) {
|
||||
this.streamingDisabled = streamingDisabled;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue