Polish SockJS client

This commit is contained in:
Rossen Stoyanchev 2015-07-28 08:11:52 -04:00
parent 6a59d00576
commit 9f557cf930
9 changed files with 347 additions and 262 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,6 +18,7 @@ package org.springframework.web.socket.sockjs.client;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
@ -66,7 +67,7 @@ public abstract class AbstractXhrTransport implements XhrTransport {
@Override
public List<TransportType> getTransportTypes() {
return (isXhrStreamingDisabled() ?
Arrays.asList(TransportType.XHR) :
Collections.singletonList(TransportType.XHR) :
Arrays.asList(TransportType.XHR_STREAMING, TransportType.XHR));
}
@ -111,44 +112,8 @@ public abstract class AbstractXhrTransport implements XhrTransport {
return this.requestHeaders;
}
@Override
public String executeInfoRequest(URI infoUrl) {
if (logger.isDebugEnabled()) {
logger.debug("Executing SockJS Info request, url=" + infoUrl);
}
ResponseEntity<String> response = executeInfoRequestInternal(infoUrl);
if (response.getStatusCode() != HttpStatus.OK) {
if (logger.isErrorEnabled()) {
logger.error("SockJS Info request (url=" + infoUrl + ") failed: " + response);
}
throw new HttpServerErrorException(response.getStatusCode());
}
if (logger.isTraceEnabled()) {
logger.trace("SockJS Info request (url=" + infoUrl + ") response: " + response);
}
return response.getBody();
}
protected abstract ResponseEntity<String> executeInfoRequestInternal(URI infoUrl);
@Override
public void executeSendRequest(URI url, TextMessage message) {
if (logger.isTraceEnabled()) {
logger.trace("Starting XHR send, url=" + url);
}
ResponseEntity<String> response = executeSendRequestInternal(url, this.xhrSendRequestHeaders, message);
if (response.getStatusCode() != HttpStatus.NO_CONTENT) {
if (logger.isErrorEnabled()) {
logger.error("XHR send request (url=" + url + ") failed: " + response);
}
throw new HttpServerErrorException(response.getStatusCode());
}
if (logger.isTraceEnabled()) {
logger.trace("XHR send request (url=" + url + ") response: " + response);
}
}
protected abstract ResponseEntity<String> executeSendRequestInternal(URI url, HttpHeaders headers, TextMessage message);
// Transport methods
@Override
public ListenableFuture<WebSocketSession> connect(TransportRequest request, WebSocketHandler handler) {
@ -174,6 +139,49 @@ public abstract class AbstractXhrTransport implements XhrTransport {
URI receiveUrl, HttpHeaders handshakeHeaders, XhrClientSockJsSession session,
SettableListenableFuture<WebSocketSession> connectFuture);
// InfoReceiver methods
@Override
public String executeInfoRequest(URI infoUrl) {
if (logger.isDebugEnabled()) {
logger.debug("Executing SockJS Info request, url=" + infoUrl);
}
ResponseEntity<String> response = executeInfoRequestInternal(infoUrl);
if (response.getStatusCode() != HttpStatus.OK) {
if (logger.isErrorEnabled()) {
logger.error("SockJS Info request (url=" + infoUrl + ") failed: " + response);
}
throw new HttpServerErrorException(response.getStatusCode());
}
if (logger.isTraceEnabled()) {
logger.trace("SockJS Info request (url=" + infoUrl + ") response: " + response);
}
return response.getBody();
}
protected abstract ResponseEntity<String> executeInfoRequestInternal(URI infoUrl);
// XhrTransport methods
@Override
public void executeSendRequest(URI url, TextMessage message) {
if (logger.isTraceEnabled()) {
logger.trace("Starting XHR send, url=" + url);
}
ResponseEntity<String> response = executeSendRequestInternal(url, this.xhrSendRequestHeaders, message);
if (response.getStatusCode() != HttpStatus.NO_CONTENT) {
if (logger.isErrorEnabled()) {
logger.error("XHR send request (url=" + url + ") failed: " + response);
}
throw new HttpServerErrorException(response.getStatusCode());
}
if (logger.isTraceEnabled()) {
logger.trace("XHR send request (url=" + url + ") response: " + response);
}
}
protected abstract ResponseEntity<String> executeSendRequestInternal(URI url, HttpHeaders headers, TextMessage message);
@Override
public String toString() {

View File

@ -1,14 +1,33 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.socket.sockjs.client;
import java.net.URI;
/**
* A simple contract for executing the SockJS "Info" request before the SockJS
* session starts. The request is used to check server capabilities such as
* whether it permits use of the WebSocket transport.
* A component that can execute the SockJS "Info" request that needs to be
* performed before the SockJS session starts in order to check server endpoint
* capabilities such as whether the endpoint permits use of WebSocket.
*
* <p>Typically {@link XhrTransport} implementations are also implementations
* of this contract.
*
* @author Rossen Stoyanchev
* @since 4.1
* @see AbstractXhrTransport
*/
public interface InfoReceiver {

View File

@ -44,7 +44,6 @@ import org.springframework.web.socket.sockjs.SockJsException;
import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
import org.springframework.web.socket.sockjs.frame.SockJsFrame;
/**
* An XHR transport based on Jetty's {@link org.eclipse.jetty.client.HttpClient}.
*
@ -105,6 +104,25 @@ public class JettyXhrTransport extends AbstractXhrTransport implements XhrTransp
return this.httpClient.isRunning();
}
@Override
protected void connectInternal(TransportRequest request, WebSocketHandler handler,
URI url, HttpHeaders handshakeHeaders, XhrClientSockJsSession session,
SettableListenableFuture<WebSocketSession> connectFuture) {
SockJsResponseListener listener = new SockJsResponseListener(url, getRequestHeaders(), session, connectFuture);
executeReceiveRequest(url, handshakeHeaders, listener);
}
private void executeReceiveRequest(URI url, HttpHeaders headers, SockJsResponseListener listener) {
if (logger.isTraceEnabled()) {
logger.trace("Starting XHR receive request, url=" + url);
}
Request httpRequest = this.httpClient.newRequest(url).method(HttpMethod.POST);
addHttpHeaders(httpRequest, headers);
httpRequest.send(listener);
}
@Override
protected ResponseEntity<String> executeInfoRequestInternal(URI infoUrl) {
return executeRequest(infoUrl, HttpMethod.GET, getRequestHeaders(), null);
@ -157,28 +175,11 @@ public class JettyXhrTransport extends AbstractXhrTransport implements XhrTransp
return responseHeaders;
}
@Override
protected void connectInternal(TransportRequest request, WebSocketHandler handler,
URI url, HttpHeaders handshakeHeaders, XhrClientSockJsSession session,
SettableListenableFuture<WebSocketSession> connectFuture) {
SockJsResponseListener listener = new SockJsResponseListener(url, getRequestHeaders(), session, connectFuture);
executeReceiveRequest(url, handshakeHeaders, listener);
}
private void executeReceiveRequest(URI url, HttpHeaders headers, SockJsResponseListener listener) {
if (logger.isTraceEnabled()) {
logger.trace("Starting XHR receive request, url=" + url);
}
Request httpRequest = this.httpClient.newRequest(url).method(HttpMethod.POST);
addHttpHeaders(httpRequest, headers);
httpRequest.send(listener);
}
/**
* Splits the body of an HTTP response into SockJS frames and delegates those
* to an {@link XhrClientSockJsSession}.
* Jetty client {@link org.eclipse.jetty.client.api.Response.Listener Response
* Listener} that splits the body of the response into SockJS frames and
* delegates them to the {@link XhrClientSockJsSession}.
*/
private class SockJsResponseListener extends Response.Listener.Adapter {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -93,18 +93,6 @@ public class RestTemplateXhrTransport extends AbstractXhrTransport implements Xh
}
@Override
public ResponseEntity<String> executeInfoRequestInternal(URI infoUrl) {
RequestCallback requestCallback = new XhrRequestCallback(getRequestHeaders());
return this.restTemplate.execute(infoUrl, HttpMethod.GET, requestCallback, textExtractor);
}
@Override
public ResponseEntity<String> executeSendRequestInternal(URI url, HttpHeaders headers, TextMessage message) {
RequestCallback requestCallback = new XhrRequestCallback(headers, message.getPayload());
return this.restTemplate.execute(url, HttpMethod.POST, requestCallback, textExtractor);
}
@Override
protected void connectInternal(final TransportRequest request, final WebSocketHandler handler,
final URI receiveUrl, final HttpHeaders handshakeHeaders, final XhrClientSockJsSession session,
@ -143,11 +131,23 @@ public class RestTemplateXhrTransport extends AbstractXhrTransport implements Xh
});
}
@Override
public ResponseEntity<String> executeInfoRequestInternal(URI infoUrl) {
RequestCallback requestCallback = new XhrRequestCallback(getRequestHeaders());
return this.restTemplate.execute(infoUrl, HttpMethod.GET, requestCallback, textResponseExtractor);
}
@Override
public ResponseEntity<String> executeSendRequestInternal(URI url, HttpHeaders headers, TextMessage message) {
RequestCallback requestCallback = new XhrRequestCallback(headers, message.getPayload());
return this.restTemplate.execute(url, HttpMethod.POST, requestCallback, textResponseExtractor);
}
/**
* A simple ResponseExtractor that reads the body into a String.
*/
private final static ResponseExtractor<ResponseEntity<String>> textExtractor =
private final static ResponseExtractor<ResponseEntity<String>> textResponseExtractor =
new ResponseExtractor<ResponseEntity<String>>() {
@Override
public ResponseEntity<String> extractData(ClientHttpResponse response) throws IOException {
@ -161,7 +161,6 @@ public class RestTemplateXhrTransport extends AbstractXhrTransport implements Xh
}
};
/**
* A RequestCallback to add the headers and (optionally) String content.
*/
@ -191,7 +190,6 @@ public class RestTemplateXhrTransport extends AbstractXhrTransport implements Xh
}
}
/**
* Splits the body of an HTTP response into SockJS frames and delegates those
* to an {@link XhrClientSockJsSession}.

View File

@ -25,8 +25,9 @@ import org.springframework.web.socket.sockjs.transport.TransportType;
import org.springframework.web.util.UriComponentsBuilder;
/**
* Given the base URL to a SockJS server endpoint, also provides methods to
* generate and obtain session and a server id used for construct a transport URL.
* Container for the base URL of a SockJS endpoint with additional helper methods
* to derive related SockJS URLs as the {@link #getInfoUrl() info} URL and
* {@link #getTransportUrl(TransportType) transport} URLs.
*
* @author Rossen Stoyanchev
* @since 4.1

View File

@ -1,15 +1,36 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.socket.sockjs.client;
import java.net.URI;
import java.security.Principal;
import org.springframework.http.HttpHeaders;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec;
/**
* Represents a request to connect to a SockJS service using a specific
* Transport. A single SockJS request however may require falling back
* and therefore multiple TransportRequest instances.
* Exposes information, typically to {@link Transport} and
* {@link AbstractClientSockJsSession session} implementations, about a request
* to connect to a SockJS server endpoint over a given transport.
*
* <p>Note that a single request to connect via {@link SockJsClient} may result
* in multiple instances of {@link TransportRequest}, one for each transport
* before a connection is successfully established.
*
* @author Rossen Stoyanchev
* @since 4.1

View File

@ -20,7 +20,6 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@ -82,6 +81,7 @@ import org.springframework.web.socket.sockjs.frame.SockJsFrame;
* </pre>
*
* @author Brian Clozel
* @author Rossen Stoyanchev
* @since 4.1.2
* @see org.xnio.Options
*/
@ -90,10 +90,10 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
private static final AttachmentKey<String> RESPONSE_BODY = AttachmentKey.create(String.class);
private final UndertowClient httpClient;
private final OptionMap optionMap;
private final UndertowClient httpClient;
private final XnioWorker worker;
private final Pool<ByteBuffer> bufferPool;
@ -112,29 +112,6 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
}
private static HttpHeaders toHttpHeaders(HeaderMap headerMap) {
HttpHeaders responseHeaders = new HttpHeaders();
Iterator<HttpString> names = headerMap.getHeaderNames().iterator();
while (names.hasNext()) {
HttpString name = names.next();
Iterator<String> values = headerMap.get(name).iterator();
while (values.hasNext()) {
responseHeaders.add(name.toString(), values.next());
}
}
return responseHeaders;
}
private static void addHttpHeaders(ClientRequest request, HttpHeaders headers) {
HeaderMap headerMap = request.getRequestHeaders();
for (String name : headers.keySet()) {
for (String value : headers.get(name)) {
headerMap.add(HttpString.tryFromString(name), value);
}
}
}
/**
* Return Undertow's native HTTP client
*/
@ -152,6 +129,130 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
}
@Override
protected void connectInternal(TransportRequest request, WebSocketHandler handler, URI receiveUrl,
HttpHeaders handshakeHeaders, XhrClientSockJsSession session,
SettableListenableFuture<WebSocketSession> connectFuture) {
executeReceiveRequest(receiveUrl, handshakeHeaders, session, connectFuture);
}
private void executeReceiveRequest(final URI url, final HttpHeaders headers,
final XhrClientSockJsSession session,
final SettableListenableFuture<WebSocketSession> connectFuture) {
if (logger.isTraceEnabled()) {
logger.trace("Starting XHR receive request, url=" + url);
}
this.httpClient.connect(
new ClientCallback<ClientConnection>() {
@Override
public void completed(ClientConnection connection) {
ClientRequest request = new ClientRequest().setMethod(Methods.POST).setPath(url.getPath());
HttpString headerName = HttpString.tryFromString(HttpHeaders.HOST);
request.getRequestHeaders().add(headerName, url.getHost());
addHttpHeaders(request, headers);
connection.sendRequest(request, createReceiveCallback(url,
getRequestHeaders(), session, connectFuture));
}
@Override
public void failed(IOException ex) {
throw new SockJsTransportFailureException("Failed to execute request to " + url, ex);
}
},
url, this.worker, this.bufferPool, this.optionMap);
}
private static void addHttpHeaders(ClientRequest request, HttpHeaders headers) {
HeaderMap headerMap = request.getRequestHeaders();
for (String name : headers.keySet()) {
for (String value : headers.get(name)) {
headerMap.add(HttpString.tryFromString(name), value);
}
}
}
private ClientCallback<ClientExchange> createReceiveCallback(final URI url, final HttpHeaders headers,
final XhrClientSockJsSession sockJsSession,
final SettableListenableFuture<WebSocketSession> connectFuture) {
return new ClientCallback<ClientExchange>() {
@Override
public void completed(final ClientExchange exchange) {
exchange.setResponseListener(new ClientCallback<ClientExchange>() {
@Override
public void completed(ClientExchange result) {
ClientResponse response = result.getResponse();
if (response.getResponseCode() != 200) {
HttpStatus status = HttpStatus.valueOf(response.getResponseCode());
IoUtils.safeClose(result.getConnection());
onFailure(new HttpServerErrorException(status, "Unexpected XHR receive status"));
}
else {
SockJsResponseListener listener = new SockJsResponseListener(result.getConnection(),
url, headers, sockJsSession, connectFuture);
listener.setup(result.getResponseChannel());
}
if (logger.isTraceEnabled()) {
logger.trace("XHR receive headers: " + toHttpHeaders(response.getResponseHeaders()));
}
try {
StreamSinkChannel channel = result.getRequestChannel();
channel.shutdownWrites();
if (!channel.flush()) {
channel.getWriteSetter().set(ChannelListeners.<StreamSinkChannel>flushingChannelListener(null, null));
channel.resumeWrites();
}
}
catch (IOException exc) {
IoUtils.safeClose(result.getConnection());
onFailure(exc);
}
}
@Override
public void failed(IOException exc) {
IoUtils.safeClose(exchange.getConnection());
onFailure(exc);
}
});
}
@Override
public void failed(IOException exc) {
onFailure(exc);
}
private void onFailure(Throwable failure) {
if (connectFuture.setException(failure)) {
return;
}
if (sockJsSession.isDisconnected()) {
sockJsSession.afterTransportClosed(null);
}
else {
sockJsSession.handleTransportError(failure);
sockJsSession.afterTransportClosed(new CloseStatus(1006, failure.getMessage()));
}
}
};
}
private static HttpHeaders toHttpHeaders(HeaderMap headerMap) {
HttpHeaders httpHeaders = new HttpHeaders();
for (HttpString name : headerMap.getHeaderNames()) {
for (String value : headerMap.get(name)) {
httpHeaders.add(name.toString(), value);
}
}
return httpHeaders;
}
@Override
protected ResponseEntity<String> executeInfoRequestInternal(URI infoUrl) {
return executeRequest(infoUrl, Methods.GET, getRequestHeaders(), null);
@ -167,13 +268,14 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
List<ClientResponse> responses = new CopyOnWriteArrayList<ClientResponse>();
try {
ClientConnection connection = this.httpClient.connect(
url, this.worker, this.bufferPool, this.optionMap).get();
ClientConnection connection = this.httpClient.connect(url, this.worker,
this.bufferPool, this.optionMap).get();
try {
ClientRequest request = new ClientRequest().setMethod(method).setPath(url.getPath());
request.getRequestHeaders().add(HttpString.tryFromString(HttpHeaders.HOST), url.getHost());
if (body != null && !body.isEmpty()) {
request.getRequestHeaders().add(HttpString.tryFromString(HttpHeaders.CONTENT_LENGTH), body.length());
HttpString headerName = HttpString.tryFromString(HttpHeaders.CONTENT_LENGTH);
request.getRequestHeaders().add(headerName, body.length());
}
addHttpHeaders(request, headers);
connection.sendRequest(request, createRequestCallback(body, responses, latch));
@ -197,13 +299,13 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
catch (InterruptedException ex) {
throw new SockJsTransportFailureException("Interrupted while processing request to " + url, ex);
}
}
private ClientCallback<ClientExchange> createRequestCallback(final String body,
final List<ClientResponse> responses, final CountDownLatch latch) {
return new ClientCallback<ClientExchange>() {
@Override
public void completed(ClientExchange result) {
result.setResponseListener(new ClientCallback<ClientExchange>() {
@ -242,10 +344,12 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
onFailure(latch, ex);
}
}
@Override
public void failed(IOException ex) {
onFailure(latch, ex);
}
private void onFailure(CountDownLatch latch, IOException ex) {
latch.countDown();
throw new SockJsTransportFailureException("Failed to execute request", ex);
@ -253,103 +357,8 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
};
}
@Override
protected void connectInternal(TransportRequest request, WebSocketHandler handler, URI receiveUrl,
HttpHeaders handshakeHeaders, XhrClientSockJsSession session,
SettableListenableFuture<WebSocketSession> connectFuture) {
executeReceiveRequest(receiveUrl, handshakeHeaders, session, connectFuture);
}
private void executeReceiveRequest(final URI url, final HttpHeaders headers, final XhrClientSockJsSession session,
final SettableListenableFuture<WebSocketSession> connectFuture) {
if (logger.isTraceEnabled()) {
logger.trace("Starting XHR receive request, url=" + url);
}
this.httpClient.connect(
new ClientCallback<ClientConnection>() {
@Override
public void completed(ClientConnection result) {
final ClientRequest httpRequest = new ClientRequest().setMethod(Methods.POST).setPath(url.getPath());
httpRequest.getRequestHeaders().add(HttpString.tryFromString(HttpHeaders.HOST), url.getHost());
addHttpHeaders(httpRequest, headers);
result.sendRequest(httpRequest, createConnectCallback(url, getRequestHeaders(), session, connectFuture));
}
@Override
public void failed(IOException ex) {
throw new SockJsTransportFailureException("Failed to execute request to " + url, ex);
}
},
url, this.worker, this.bufferPool, this.optionMap);
}
private ClientCallback<ClientExchange> createConnectCallback(final URI url, final HttpHeaders headers,
final XhrClientSockJsSession sockJsSession, final SettableListenableFuture<WebSocketSession> connectFuture) {
return new ClientCallback<ClientExchange>() {
@Override
public void completed(final ClientExchange result) {
result.setResponseListener(new ClientCallback<ClientExchange>() {
@Override
public void completed(ClientExchange result) {
ClientResponse response = result.getResponse();
if (response.getResponseCode() != 200) {
HttpStatus status = HttpStatus.valueOf(response.getResponseCode());
IoUtils.safeClose(result.getConnection());
onFailure(new HttpServerErrorException(status, "Unexpected XHR receive status"));
}
else {
SockJsResponseListener listener = new SockJsResponseListener(result.getConnection(),
url, headers, sockJsSession, connectFuture);
listener.setup(result.getResponseChannel());
}
if (logger.isTraceEnabled()) {
logger.trace("XHR receive headers: " + toHttpHeaders(response.getResponseHeaders()));
}
try {
result.getRequestChannel().shutdownWrites();
if (!result.getRequestChannel().flush()) {
result.getRequestChannel().getWriteSetter()
.set(ChannelListeners.<StreamSinkChannel>flushingChannelListener(null, null));
result.getRequestChannel().resumeWrites();
}
}
catch (IOException exc) {
IoUtils.safeClose(result.getConnection());
onFailure(exc);
}
}
@Override
public void failed(IOException exc) {
IoUtils.safeClose(result.getConnection());
onFailure(exc);
}
});
}
@Override
public void failed(IOException exc) {
onFailure(exc);
}
private void onFailure(Throwable failure) {
if (connectFuture.setException(failure)) {
return;
}
if (sockJsSession.isDisconnected()) {
sockJsSession.afterTransportClosed(null);
}
else {
sockJsSession.handleTransportError(failure);
sockJsSession.afterTransportClosed(new CloseStatus(1006, failure.getMessage()));
}
}
};
}
public class SockJsResponseListener implements ChannelListener<StreamSourceChannel> {
private class SockJsResponseListener implements ChannelListener<StreamSourceChannel> {
private final ClientConnection connection;
@ -363,8 +372,9 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
public SockJsResponseListener(ClientConnection connection, URI url, HttpHeaders headers,
XhrClientSockJsSession sockJsSession, SettableListenableFuture<WebSocketSession> connectFuture) {
public SockJsResponseListener(ClientConnection connection, URI url,
HttpHeaders headers, XhrClientSockJsSession sockJsSession,
SettableListenableFuture<WebSocketSession> connectFuture) {
this.connection = connection;
this.url = url;

View File

@ -1,3 +1,18 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.socket.sockjs.client;
import java.net.URI;

View File

@ -17,6 +17,7 @@
package org.springframework.web.socket.sockjs.client;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@ -27,7 +28,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
@ -39,7 +39,6 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@ -56,6 +55,7 @@ import org.springframework.tests.TestGroup;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.context.support.AnnotationConfigWebApplicationContext;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.WebSocketTestServer;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
@ -66,7 +66,10 @@ import org.springframework.web.socket.server.HandshakeHandler;
import org.springframework.web.socket.server.RequestUpgradeStrategy;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Abstract base class for integration tests using the
@ -90,7 +93,7 @@ public abstract class AbstractSockJsIntegrationTests {
private AnnotationConfigWebApplicationContext wac;
private ErrorFilter errorFilter;
private TestFilter testFilter;
private String baseUrl;
@ -104,12 +107,12 @@ public abstract class AbstractSockJsIntegrationTests {
@Before
public void setup() throws Exception {
logger.debug("Setting up '" + this.testName.getMethodName() + "'");
this.errorFilter = new ErrorFilter();
this.testFilter = new TestFilter();
this.wac = new AnnotationConfigWebApplicationContext();
this.wac.register(TestConfig.class, upgradeStrategyConfigClass());
this.server = createWebSocketTestServer();
this.server.setup();
this.server.deployConfig(this.wac, this.errorFilter);
this.server.deployConfig(this.wac, this.testFilter);
// Set ServletContext in WebApplicationContext after deployment but before
// starting the server.
this.wac.setServletContext(this.server.getServletContext());
@ -178,25 +181,25 @@ public abstract class AbstractSockJsIntegrationTests {
@Test
public void receiveOneMessageWebSocket() throws Exception {
testReceiveOneMessage(createWebSocketTransport());
testReceiveOneMessage(createWebSocketTransport(), null);
}
@Test
public void receiveOneMessageXhrStreaming() throws Exception {
testReceiveOneMessage(createXhrTransport());
testReceiveOneMessage(createXhrTransport(), null);
}
@Test
public void receiveOneMessageXhr() throws Exception {
AbstractXhrTransport xhrTransport = createXhrTransport();
xhrTransport.setXhrStreamingDisabled(true);
testReceiveOneMessage(xhrTransport);
testReceiveOneMessage(xhrTransport, null);
}
@Test
public void infoRequestFailure() throws Exception {
TestClientHandler handler = new TestClientHandler();
this.errorFilter.responseStatusMap.put("/info", 500);
this.testFilter.sendErrorMap.put("/info", 500);
CountDownLatch latch = new CountDownLatch(1);
initSockJsClient(createWebSocketTransport());
this.sockJsClient.doHandshake(handler, this.baseUrl + "/echo").addCallback(
@ -204,6 +207,7 @@ public abstract class AbstractSockJsIntegrationTests {
@Override
public void onSuccess(WebSocketSession result) {
}
@Override
public void onFailure(Throwable ex) {
latch.countDown();
@ -215,8 +219,8 @@ public abstract class AbstractSockJsIntegrationTests {
@Test
public void fallbackAfterTransportFailure() throws Exception {
this.errorFilter.responseStatusMap.put("/websocket", 200);
this.errorFilter.responseStatusMap.put("/xhr_streaming", 500);
this.testFilter.sendErrorMap.put("/websocket", 200);
this.testFilter.sendErrorMap.put("/xhr_streaming", 500);
TestClientHandler handler = new TestClientHandler();
initSockJsClient(createWebSocketTransport(), createXhrTransport());
WebSocketSession session = this.sockJsClient.doHandshake(handler, this.baseUrl + "/echo").get();
@ -229,8 +233,8 @@ public abstract class AbstractSockJsIntegrationTests {
@Test(timeout = 5000)
public void fallbackAfterConnectTimeout() throws Exception {
TestClientHandler clientHandler = new TestClientHandler();
this.errorFilter.sleepDelayMap.put("/xhr_streaming", 10000L);
this.errorFilter.responseStatusMap.put("/xhr_streaming", 503);
this.testFilter.sleepDelayMap.put("/xhr_streaming", 10000L);
this.testFilter.sendErrorMap.put("/xhr_streaming", 503);
initSockJsClient(createXhrTransport());
this.sockJsClient.setConnectTimeoutScheduler(this.wac.getBean(ThreadPoolTaskScheduler.class));
WebSocketSession clientSession = sockJsClient.doHandshake(clientHandler, this.baseUrl + "/echo").get();
@ -261,10 +265,12 @@ public abstract class AbstractSockJsIntegrationTests {
session.close();
}
private void testReceiveOneMessage(Transport transport) throws Exception {
private void testReceiveOneMessage(Transport transport, WebSocketHttpHeaders headers)
throws Exception {
TestClientHandler clientHandler = new TestClientHandler();
initSockJsClient(transport);
this.sockJsClient.doHandshake(clientHandler, this.baseUrl + "/test").get();
this.sockJsClient.doHandshake(clientHandler, headers, new URI(this.baseUrl + "/test")).get();
TestServerHandler serverHandler = this.wac.getBean(TestServerHandler.class);
assertNotNull("afterConnectionEstablished should have been called", clientHandler.session);
@ -275,6 +281,22 @@ public abstract class AbstractSockJsIntegrationTests {
clientHandler.awaitMessage(message, 5000);
}
private static void awaitEvent(Supplier<Boolean> condition, long timeToWait, String description) {
long timeToSleep = 200;
for (int i = 0 ; i < Math.floor(timeToWait / timeToSleep); i++) {
if (condition.get()) {
return;
}
try {
Thread.sleep(timeToSleep);
}
catch (InterruptedException e) {
throw new IllegalStateException("Interrupted while waiting for " + description, e);
}
}
throw new IllegalStateException("Timed out waiting for " + description);
}
@Configuration
@EnableWebSocket
@ -296,23 +318,6 @@ public abstract class AbstractSockJsIntegrationTests {
}
}
private static void awaitEvent(Supplier<Boolean> condition, long timeToWait, String description) {
long timeToSleep = 200;
for (int i = 0 ; i < Math.floor(timeToWait / timeToSleep); i++) {
if (condition.get()) {
return;
}
try {
Thread.sleep(timeToSleep);
}
catch (InterruptedException e) {
throw new IllegalStateException("Interrupted while waiting for " + description, e);
}
}
throw new IllegalStateException("Timed out waiting for " + description);
}
private static class TestClientHandler extends TextWebSocketHandler {
private final BlockingQueue<TextMessage> receivedMessages = new LinkedBlockingQueue<>();
@ -356,6 +361,14 @@ public abstract class AbstractSockJsIntegrationTests {
}
}
private static class EchoHandler extends TextWebSocketHandler {
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
session.sendMessage(message);
}
}
private static class TestServerHandler extends TextWebSocketHandler {
private WebSocketSession session;
@ -371,24 +384,23 @@ public abstract class AbstractSockJsIntegrationTests {
}
}
private static class EchoHandler extends TextWebSocketHandler {
private static class TestFilter implements Filter {
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
session.sendMessage(message);
}
}
private static class ErrorFilter implements Filter {
private final Map<String, Integer> responseStatusMap = new HashMap<>();
private final List<ServletRequest> requests = new ArrayList<>();
private final Map<String, Long> sleepDelayMap = new HashMap<>();
private final Map<String, Integer> sendErrorMap = new HashMap<>();
@Override
public void doFilter(ServletRequest req, ServletResponse resp, FilterChain chain) throws IOException, ServletException {
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
this.requests.add(request);
for (String suffix : this.sleepDelayMap.keySet()) {
if (((HttpServletRequest) req).getRequestURI().endsWith(suffix)) {
if (((HttpServletRequest) request).getRequestURI().endsWith(suffix)) {
try {
Thread.sleep(this.sleepDelayMap.get(suffix));
break;
@ -398,17 +410,17 @@ public abstract class AbstractSockJsIntegrationTests {
}
}
}
for (String suffix : this.responseStatusMap.keySet()) {
if (((HttpServletRequest) req).getRequestURI().endsWith(suffix)) {
((HttpServletResponse) resp).sendError(this.responseStatusMap.get(suffix));
for (String suffix : this.sendErrorMap.keySet()) {
if (((HttpServletRequest) request).getRequestURI().endsWith(suffix)) {
((HttpServletResponse) response).sendError(this.sendErrorMap.get(suffix));
return;
}
}
chain.doFilter(req, resp);
chain.doFilter(request, response);
}
@Override
public void init(FilterConfig filterConfig) throws ServletException {
public void init(FilterConfig filterConfig) {
}
@Override