diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractXhrTransport.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractXhrTransport.java index 87f53cec202..a8e12ac3b57 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractXhrTransport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractXhrTransport.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ package org.springframework.web.socket.sockjs.client; import java.net.URI; import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.apache.commons.logging.Log; @@ -66,7 +67,7 @@ public abstract class AbstractXhrTransport implements XhrTransport { @Override public List getTransportTypes() { return (isXhrStreamingDisabled() ? - Arrays.asList(TransportType.XHR) : + Collections.singletonList(TransportType.XHR) : Arrays.asList(TransportType.XHR_STREAMING, TransportType.XHR)); } @@ -111,44 +112,8 @@ public abstract class AbstractXhrTransport implements XhrTransport { return this.requestHeaders; } - @Override - public String executeInfoRequest(URI infoUrl) { - if (logger.isDebugEnabled()) { - logger.debug("Executing SockJS Info request, url=" + infoUrl); - } - ResponseEntity response = executeInfoRequestInternal(infoUrl); - if (response.getStatusCode() != HttpStatus.OK) { - if (logger.isErrorEnabled()) { - logger.error("SockJS Info request (url=" + infoUrl + ") failed: " + response); - } - throw new HttpServerErrorException(response.getStatusCode()); - } - if (logger.isTraceEnabled()) { - logger.trace("SockJS Info request (url=" + infoUrl + ") response: " + response); - } - return response.getBody(); - } - protected abstract ResponseEntity executeInfoRequestInternal(URI infoUrl); - - @Override - public void executeSendRequest(URI url, TextMessage message) { - if (logger.isTraceEnabled()) { - logger.trace("Starting XHR send, url=" + url); - } - ResponseEntity response = executeSendRequestInternal(url, this.xhrSendRequestHeaders, message); - if (response.getStatusCode() != HttpStatus.NO_CONTENT) { - if (logger.isErrorEnabled()) { - logger.error("XHR send request (url=" + url + ") failed: " + response); - } - throw new HttpServerErrorException(response.getStatusCode()); - } - if (logger.isTraceEnabled()) { - logger.trace("XHR send request (url=" + url + ") response: " + response); - } - } - - protected abstract ResponseEntity executeSendRequestInternal(URI url, HttpHeaders headers, TextMessage message); + // Transport methods @Override public ListenableFuture connect(TransportRequest request, WebSocketHandler handler) { @@ -174,6 +139,49 @@ public abstract class AbstractXhrTransport implements XhrTransport { URI receiveUrl, HttpHeaders handshakeHeaders, XhrClientSockJsSession session, SettableListenableFuture connectFuture); + // InfoReceiver methods + + @Override + public String executeInfoRequest(URI infoUrl) { + if (logger.isDebugEnabled()) { + logger.debug("Executing SockJS Info request, url=" + infoUrl); + } + ResponseEntity response = executeInfoRequestInternal(infoUrl); + if (response.getStatusCode() != HttpStatus.OK) { + if (logger.isErrorEnabled()) { + logger.error("SockJS Info request (url=" + infoUrl + ") failed: " + response); + } + throw new HttpServerErrorException(response.getStatusCode()); + } + if (logger.isTraceEnabled()) { + logger.trace("SockJS Info request (url=" + infoUrl + ") response: " + response); + } + return response.getBody(); + } + + protected abstract ResponseEntity executeInfoRequestInternal(URI infoUrl); + + // XhrTransport methods + + @Override + public void executeSendRequest(URI url, TextMessage message) { + if (logger.isTraceEnabled()) { + logger.trace("Starting XHR send, url=" + url); + } + ResponseEntity response = executeSendRequestInternal(url, this.xhrSendRequestHeaders, message); + if (response.getStatusCode() != HttpStatus.NO_CONTENT) { + if (logger.isErrorEnabled()) { + logger.error("XHR send request (url=" + url + ") failed: " + response); + } + throw new HttpServerErrorException(response.getStatusCode()); + } + if (logger.isTraceEnabled()) { + logger.trace("XHR send request (url=" + url + ") response: " + response); + } + } + + protected abstract ResponseEntity executeSendRequestInternal(URI url, HttpHeaders headers, TextMessage message); + @Override public String toString() { diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/InfoReceiver.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/InfoReceiver.java index ae8ba7bf011..e921c97d720 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/InfoReceiver.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/InfoReceiver.java @@ -1,14 +1,33 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.springframework.web.socket.sockjs.client; import java.net.URI; /** - * A simple contract for executing the SockJS "Info" request before the SockJS - * session starts. The request is used to check server capabilities such as - * whether it permits use of the WebSocket transport. + * A component that can execute the SockJS "Info" request that needs to be + * performed before the SockJS session starts in order to check server endpoint + * capabilities such as whether the endpoint permits use of WebSocket. + * + *

Typically {@link XhrTransport} implementations are also implementations + * of this contract. * * @author Rossen Stoyanchev * @since 4.1 + * @see AbstractXhrTransport */ public interface InfoReceiver { diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/JettyXhrTransport.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/JettyXhrTransport.java index 710400a3210..969c71e2781 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/JettyXhrTransport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/JettyXhrTransport.java @@ -44,7 +44,6 @@ import org.springframework.web.socket.sockjs.SockJsException; import org.springframework.web.socket.sockjs.SockJsTransportFailureException; import org.springframework.web.socket.sockjs.frame.SockJsFrame; - /** * An XHR transport based on Jetty's {@link org.eclipse.jetty.client.HttpClient}. * @@ -105,6 +104,25 @@ public class JettyXhrTransport extends AbstractXhrTransport implements XhrTransp return this.httpClient.isRunning(); } + + @Override + protected void connectInternal(TransportRequest request, WebSocketHandler handler, + URI url, HttpHeaders handshakeHeaders, XhrClientSockJsSession session, + SettableListenableFuture connectFuture) { + + SockJsResponseListener listener = new SockJsResponseListener(url, getRequestHeaders(), session, connectFuture); + executeReceiveRequest(url, handshakeHeaders, listener); + } + + private void executeReceiveRequest(URI url, HttpHeaders headers, SockJsResponseListener listener) { + if (logger.isTraceEnabled()) { + logger.trace("Starting XHR receive request, url=" + url); + } + Request httpRequest = this.httpClient.newRequest(url).method(HttpMethod.POST); + addHttpHeaders(httpRequest, headers); + httpRequest.send(listener); + } + @Override protected ResponseEntity executeInfoRequestInternal(URI infoUrl) { return executeRequest(infoUrl, HttpMethod.GET, getRequestHeaders(), null); @@ -157,28 +175,11 @@ public class JettyXhrTransport extends AbstractXhrTransport implements XhrTransp return responseHeaders; } - @Override - protected void connectInternal(TransportRequest request, WebSocketHandler handler, - URI url, HttpHeaders handshakeHeaders, XhrClientSockJsSession session, - SettableListenableFuture connectFuture) { - - SockJsResponseListener listener = new SockJsResponseListener(url, getRequestHeaders(), session, connectFuture); - executeReceiveRequest(url, handshakeHeaders, listener); - } - - private void executeReceiveRequest(URI url, HttpHeaders headers, SockJsResponseListener listener) { - if (logger.isTraceEnabled()) { - logger.trace("Starting XHR receive request, url=" + url); - } - Request httpRequest = this.httpClient.newRequest(url).method(HttpMethod.POST); - addHttpHeaders(httpRequest, headers); - httpRequest.send(listener); - } - /** - * Splits the body of an HTTP response into SockJS frames and delegates those - * to an {@link XhrClientSockJsSession}. + * Jetty client {@link org.eclipse.jetty.client.api.Response.Listener Response + * Listener} that splits the body of the response into SockJS frames and + * delegates them to the {@link XhrClientSockJsSession}. */ private class SockJsResponseListener extends Response.Listener.Adapter { diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/RestTemplateXhrTransport.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/RestTemplateXhrTransport.java index 75b688d2cde..5d6d25cf3cd 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/RestTemplateXhrTransport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/RestTemplateXhrTransport.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -93,18 +93,6 @@ public class RestTemplateXhrTransport extends AbstractXhrTransport implements Xh } - @Override - public ResponseEntity executeInfoRequestInternal(URI infoUrl) { - RequestCallback requestCallback = new XhrRequestCallback(getRequestHeaders()); - return this.restTemplate.execute(infoUrl, HttpMethod.GET, requestCallback, textExtractor); - } - - @Override - public ResponseEntity executeSendRequestInternal(URI url, HttpHeaders headers, TextMessage message) { - RequestCallback requestCallback = new XhrRequestCallback(headers, message.getPayload()); - return this.restTemplate.execute(url, HttpMethod.POST, requestCallback, textExtractor); - } - @Override protected void connectInternal(final TransportRequest request, final WebSocketHandler handler, final URI receiveUrl, final HttpHeaders handshakeHeaders, final XhrClientSockJsSession session, @@ -143,11 +131,23 @@ public class RestTemplateXhrTransport extends AbstractXhrTransport implements Xh }); } + @Override + public ResponseEntity executeInfoRequestInternal(URI infoUrl) { + RequestCallback requestCallback = new XhrRequestCallback(getRequestHeaders()); + return this.restTemplate.execute(infoUrl, HttpMethod.GET, requestCallback, textResponseExtractor); + } + + @Override + public ResponseEntity executeSendRequestInternal(URI url, HttpHeaders headers, TextMessage message) { + RequestCallback requestCallback = new XhrRequestCallback(headers, message.getPayload()); + return this.restTemplate.execute(url, HttpMethod.POST, requestCallback, textResponseExtractor); + } + /** * A simple ResponseExtractor that reads the body into a String. */ - private final static ResponseExtractor> textExtractor = + private final static ResponseExtractor> textResponseExtractor = new ResponseExtractor>() { @Override public ResponseEntity extractData(ClientHttpResponse response) throws IOException { @@ -161,7 +161,6 @@ public class RestTemplateXhrTransport extends AbstractXhrTransport implements Xh } }; - /** * A RequestCallback to add the headers and (optionally) String content. */ @@ -191,7 +190,6 @@ public class RestTemplateXhrTransport extends AbstractXhrTransport implements Xh } } - /** * Splits the body of an HTTP response into SockJS frames and delegates those * to an {@link XhrClientSockJsSession}. diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/SockJsUrlInfo.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/SockJsUrlInfo.java index 97ecd1f863f..7d916d4c4c2 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/SockJsUrlInfo.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/SockJsUrlInfo.java @@ -25,8 +25,9 @@ import org.springframework.web.socket.sockjs.transport.TransportType; import org.springframework.web.util.UriComponentsBuilder; /** - * Given the base URL to a SockJS server endpoint, also provides methods to - * generate and obtain session and a server id used for construct a transport URL. + * Container for the base URL of a SockJS endpoint with additional helper methods + * to derive related SockJS URLs as the {@link #getInfoUrl() info} URL and + * {@link #getTransportUrl(TransportType) transport} URLs. * * @author Rossen Stoyanchev * @since 4.1 diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/TransportRequest.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/TransportRequest.java index 36b6a534dbe..d0fc7df3195 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/TransportRequest.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/TransportRequest.java @@ -1,15 +1,36 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.springframework.web.socket.sockjs.client; import java.net.URI; import java.security.Principal; import org.springframework.http.HttpHeaders; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.WebSocketHttpHeaders; import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec; /** - * Represents a request to connect to a SockJS service using a specific - * Transport. A single SockJS request however may require falling back - * and therefore multiple TransportRequest instances. + * Exposes information, typically to {@link Transport} and + * {@link AbstractClientSockJsSession session} implementations, about a request + * to connect to a SockJS server endpoint over a given transport. + * + *

Note that a single request to connect via {@link SockJsClient} may result + * in multiple instances of {@link TransportRequest}, one for each transport + * before a connection is successfully established. * * @author Rossen Stoyanchev * @since 4.1 diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/UndertowXhrTransport.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/UndertowXhrTransport.java index 4c14937c162..68c064fa722 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/UndertowXhrTransport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/UndertowXhrTransport.java @@ -20,7 +20,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; -import java.util.Iterator; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -82,6 +81,7 @@ import org.springframework.web.socket.sockjs.frame.SockJsFrame; * * * @author Brian Clozel + * @author Rossen Stoyanchev * @since 4.1.2 * @see org.xnio.Options */ @@ -90,10 +90,10 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra private static final AttachmentKey RESPONSE_BODY = AttachmentKey.create(String.class); - private final UndertowClient httpClient; - private final OptionMap optionMap; + private final UndertowClient httpClient; + private final XnioWorker worker; private final Pool bufferPool; @@ -112,29 +112,6 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra } - private static HttpHeaders toHttpHeaders(HeaderMap headerMap) { - HttpHeaders responseHeaders = new HttpHeaders(); - Iterator names = headerMap.getHeaderNames().iterator(); - while (names.hasNext()) { - HttpString name = names.next(); - Iterator values = headerMap.get(name).iterator(); - while (values.hasNext()) { - responseHeaders.add(name.toString(), values.next()); - } - } - return responseHeaders; - } - - private static void addHttpHeaders(ClientRequest request, HttpHeaders headers) { - HeaderMap headerMap = request.getRequestHeaders(); - for (String name : headers.keySet()) { - for (String value : headers.get(name)) { - headerMap.add(HttpString.tryFromString(name), value); - } - } - } - - /** * Return Undertow's native HTTP client */ @@ -152,6 +129,130 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra } + @Override + protected void connectInternal(TransportRequest request, WebSocketHandler handler, URI receiveUrl, + HttpHeaders handshakeHeaders, XhrClientSockJsSession session, + SettableListenableFuture connectFuture) { + + executeReceiveRequest(receiveUrl, handshakeHeaders, session, connectFuture); + } + + private void executeReceiveRequest(final URI url, final HttpHeaders headers, + final XhrClientSockJsSession session, + final SettableListenableFuture connectFuture) { + + if (logger.isTraceEnabled()) { + logger.trace("Starting XHR receive request, url=" + url); + } + + this.httpClient.connect( + new ClientCallback() { + + @Override + public void completed(ClientConnection connection) { + ClientRequest request = new ClientRequest().setMethod(Methods.POST).setPath(url.getPath()); + HttpString headerName = HttpString.tryFromString(HttpHeaders.HOST); + request.getRequestHeaders().add(headerName, url.getHost()); + addHttpHeaders(request, headers); + connection.sendRequest(request, createReceiveCallback(url, + getRequestHeaders(), session, connectFuture)); + } + + @Override + public void failed(IOException ex) { + throw new SockJsTransportFailureException("Failed to execute request to " + url, ex); + } + }, + url, this.worker, this.bufferPool, this.optionMap); + } + + private static void addHttpHeaders(ClientRequest request, HttpHeaders headers) { + HeaderMap headerMap = request.getRequestHeaders(); + for (String name : headers.keySet()) { + for (String value : headers.get(name)) { + headerMap.add(HttpString.tryFromString(name), value); + } + } + } + + private ClientCallback createReceiveCallback(final URI url, final HttpHeaders headers, + final XhrClientSockJsSession sockJsSession, + final SettableListenableFuture connectFuture) { + + return new ClientCallback() { + + @Override + public void completed(final ClientExchange exchange) { + exchange.setResponseListener(new ClientCallback() { + + @Override + public void completed(ClientExchange result) { + ClientResponse response = result.getResponse(); + if (response.getResponseCode() != 200) { + HttpStatus status = HttpStatus.valueOf(response.getResponseCode()); + IoUtils.safeClose(result.getConnection()); + onFailure(new HttpServerErrorException(status, "Unexpected XHR receive status")); + } + else { + SockJsResponseListener listener = new SockJsResponseListener(result.getConnection(), + url, headers, sockJsSession, connectFuture); + listener.setup(result.getResponseChannel()); + } + if (logger.isTraceEnabled()) { + logger.trace("XHR receive headers: " + toHttpHeaders(response.getResponseHeaders())); + } + try { + StreamSinkChannel channel = result.getRequestChannel(); + channel.shutdownWrites(); + if (!channel.flush()) { + channel.getWriteSetter().set(ChannelListeners.flushingChannelListener(null, null)); + channel.resumeWrites(); + } + } + catch (IOException exc) { + IoUtils.safeClose(result.getConnection()); + onFailure(exc); + } + } + + @Override + public void failed(IOException exc) { + IoUtils.safeClose(exchange.getConnection()); + onFailure(exc); + } + }); + } + + @Override + public void failed(IOException exc) { + onFailure(exc); + } + + private void onFailure(Throwable failure) { + if (connectFuture.setException(failure)) { + return; + } + if (sockJsSession.isDisconnected()) { + sockJsSession.afterTransportClosed(null); + } + else { + sockJsSession.handleTransportError(failure); + sockJsSession.afterTransportClosed(new CloseStatus(1006, failure.getMessage())); + } + } + }; + } + + private static HttpHeaders toHttpHeaders(HeaderMap headerMap) { + HttpHeaders httpHeaders = new HttpHeaders(); + for (HttpString name : headerMap.getHeaderNames()) { + for (String value : headerMap.get(name)) { + httpHeaders.add(name.toString(), value); + } + } + return httpHeaders; + } + @Override protected ResponseEntity executeInfoRequestInternal(URI infoUrl) { return executeRequest(infoUrl, Methods.GET, getRequestHeaders(), null); @@ -167,13 +268,14 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra List responses = new CopyOnWriteArrayList(); try { - ClientConnection connection = this.httpClient.connect( - url, this.worker, this.bufferPool, this.optionMap).get(); + ClientConnection connection = this.httpClient.connect(url, this.worker, + this.bufferPool, this.optionMap).get(); try { ClientRequest request = new ClientRequest().setMethod(method).setPath(url.getPath()); request.getRequestHeaders().add(HttpString.tryFromString(HttpHeaders.HOST), url.getHost()); if (body != null && !body.isEmpty()) { - request.getRequestHeaders().add(HttpString.tryFromString(HttpHeaders.CONTENT_LENGTH), body.length()); + HttpString headerName = HttpString.tryFromString(HttpHeaders.CONTENT_LENGTH); + request.getRequestHeaders().add(headerName, body.length()); } addHttpHeaders(request, headers); connection.sendRequest(request, createRequestCallback(body, responses, latch)); @@ -197,13 +299,13 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra catch (InterruptedException ex) { throw new SockJsTransportFailureException("Interrupted while processing request to " + url, ex); } - } private ClientCallback createRequestCallback(final String body, final List responses, final CountDownLatch latch) { return new ClientCallback() { + @Override public void completed(ClientExchange result) { result.setResponseListener(new ClientCallback() { @@ -242,10 +344,12 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra onFailure(latch, ex); } } + @Override public void failed(IOException ex) { onFailure(latch, ex); } + private void onFailure(CountDownLatch latch, IOException ex) { latch.countDown(); throw new SockJsTransportFailureException("Failed to execute request", ex); @@ -253,103 +357,8 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra }; } - @Override - protected void connectInternal(TransportRequest request, WebSocketHandler handler, URI receiveUrl, - HttpHeaders handshakeHeaders, XhrClientSockJsSession session, - SettableListenableFuture connectFuture) { - executeReceiveRequest(receiveUrl, handshakeHeaders, session, connectFuture); - } - - private void executeReceiveRequest(final URI url, final HttpHeaders headers, final XhrClientSockJsSession session, - final SettableListenableFuture connectFuture) { - - if (logger.isTraceEnabled()) { - logger.trace("Starting XHR receive request, url=" + url); - } - - this.httpClient.connect( - new ClientCallback() { - @Override - public void completed(ClientConnection result) { - final ClientRequest httpRequest = new ClientRequest().setMethod(Methods.POST).setPath(url.getPath()); - httpRequest.getRequestHeaders().add(HttpString.tryFromString(HttpHeaders.HOST), url.getHost()); - addHttpHeaders(httpRequest, headers); - result.sendRequest(httpRequest, createConnectCallback(url, getRequestHeaders(), session, connectFuture)); - } - @Override - public void failed(IOException ex) { - throw new SockJsTransportFailureException("Failed to execute request to " + url, ex); - } - }, - url, this.worker, this.bufferPool, this.optionMap); - - } - - private ClientCallback createConnectCallback(final URI url, final HttpHeaders headers, - final XhrClientSockJsSession sockJsSession, final SettableListenableFuture connectFuture) { - - return new ClientCallback() { - @Override - public void completed(final ClientExchange result) { - result.setResponseListener(new ClientCallback() { - @Override - public void completed(ClientExchange result) { - ClientResponse response = result.getResponse(); - if (response.getResponseCode() != 200) { - HttpStatus status = HttpStatus.valueOf(response.getResponseCode()); - IoUtils.safeClose(result.getConnection()); - onFailure(new HttpServerErrorException(status, "Unexpected XHR receive status")); - } - else { - SockJsResponseListener listener = new SockJsResponseListener(result.getConnection(), - url, headers, sockJsSession, connectFuture); - listener.setup(result.getResponseChannel()); - } - if (logger.isTraceEnabled()) { - logger.trace("XHR receive headers: " + toHttpHeaders(response.getResponseHeaders())); - } - try { - result.getRequestChannel().shutdownWrites(); - if (!result.getRequestChannel().flush()) { - result.getRequestChannel().getWriteSetter() - .set(ChannelListeners.flushingChannelListener(null, null)); - result.getRequestChannel().resumeWrites(); - } - } - catch (IOException exc) { - IoUtils.safeClose(result.getConnection()); - onFailure(exc); - } - } - @Override - public void failed(IOException exc) { - IoUtils.safeClose(result.getConnection()); - onFailure(exc); - } - }); - } - @Override - public void failed(IOException exc) { - onFailure(exc); - } - private void onFailure(Throwable failure) { - if (connectFuture.setException(failure)) { - return; - } - if (sockJsSession.isDisconnected()) { - sockJsSession.afterTransportClosed(null); - } - else { - sockJsSession.handleTransportError(failure); - sockJsSession.afterTransportClosed(new CloseStatus(1006, failure.getMessage())); - } - } - }; - } - - - public class SockJsResponseListener implements ChannelListener { + private class SockJsResponseListener implements ChannelListener { private final ClientConnection connection; @@ -363,8 +372,9 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - public SockJsResponseListener(ClientConnection connection, URI url, HttpHeaders headers, - XhrClientSockJsSession sockJsSession, SettableListenableFuture connectFuture) { + public SockJsResponseListener(ClientConnection connection, URI url, + HttpHeaders headers, XhrClientSockJsSession sockJsSession, + SettableListenableFuture connectFuture) { this.connection = connection; this.url = url; diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/XhrTransport.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/XhrTransport.java index 955ed8899b9..6fcf7f16518 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/XhrTransport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/XhrTransport.java @@ -1,3 +1,18 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.springframework.web.socket.sockjs.client; import java.net.URI; diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/client/AbstractSockJsIntegrationTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/client/AbstractSockJsIntegrationTests.java index 2333760242b..0696bfe4be3 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/client/AbstractSockJsIntegrationTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/client/AbstractSockJsIntegrationTests.java @@ -17,6 +17,7 @@ package org.springframework.web.socket.sockjs.client; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -27,7 +28,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; - import javax.servlet.Filter; import javax.servlet.FilterChain; import javax.servlet.FilterConfig; @@ -39,7 +39,6 @@ import javax.servlet.http.HttpServletResponse; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -56,6 +55,7 @@ import org.springframework.tests.TestGroup; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.context.support.AnnotationConfigWebApplicationContext; import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketHttpHeaders; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.WebSocketTestServer; import org.springframework.web.socket.config.annotation.EnableWebSocket; @@ -66,7 +66,10 @@ import org.springframework.web.socket.server.HandshakeHandler; import org.springframework.web.socket.server.RequestUpgradeStrategy; import org.springframework.web.socket.server.support.DefaultHandshakeHandler; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Abstract base class for integration tests using the @@ -90,7 +93,7 @@ public abstract class AbstractSockJsIntegrationTests { private AnnotationConfigWebApplicationContext wac; - private ErrorFilter errorFilter; + private TestFilter testFilter; private String baseUrl; @@ -104,12 +107,12 @@ public abstract class AbstractSockJsIntegrationTests { @Before public void setup() throws Exception { logger.debug("Setting up '" + this.testName.getMethodName() + "'"); - this.errorFilter = new ErrorFilter(); + this.testFilter = new TestFilter(); this.wac = new AnnotationConfigWebApplicationContext(); this.wac.register(TestConfig.class, upgradeStrategyConfigClass()); this.server = createWebSocketTestServer(); this.server.setup(); - this.server.deployConfig(this.wac, this.errorFilter); + this.server.deployConfig(this.wac, this.testFilter); // Set ServletContext in WebApplicationContext after deployment but before // starting the server. this.wac.setServletContext(this.server.getServletContext()); @@ -178,25 +181,25 @@ public abstract class AbstractSockJsIntegrationTests { @Test public void receiveOneMessageWebSocket() throws Exception { - testReceiveOneMessage(createWebSocketTransport()); + testReceiveOneMessage(createWebSocketTransport(), null); } @Test public void receiveOneMessageXhrStreaming() throws Exception { - testReceiveOneMessage(createXhrTransport()); + testReceiveOneMessage(createXhrTransport(), null); } @Test public void receiveOneMessageXhr() throws Exception { AbstractXhrTransport xhrTransport = createXhrTransport(); xhrTransport.setXhrStreamingDisabled(true); - testReceiveOneMessage(xhrTransport); + testReceiveOneMessage(xhrTransport, null); } @Test public void infoRequestFailure() throws Exception { TestClientHandler handler = new TestClientHandler(); - this.errorFilter.responseStatusMap.put("/info", 500); + this.testFilter.sendErrorMap.put("/info", 500); CountDownLatch latch = new CountDownLatch(1); initSockJsClient(createWebSocketTransport()); this.sockJsClient.doHandshake(handler, this.baseUrl + "/echo").addCallback( @@ -204,6 +207,7 @@ public abstract class AbstractSockJsIntegrationTests { @Override public void onSuccess(WebSocketSession result) { } + @Override public void onFailure(Throwable ex) { latch.countDown(); @@ -215,8 +219,8 @@ public abstract class AbstractSockJsIntegrationTests { @Test public void fallbackAfterTransportFailure() throws Exception { - this.errorFilter.responseStatusMap.put("/websocket", 200); - this.errorFilter.responseStatusMap.put("/xhr_streaming", 500); + this.testFilter.sendErrorMap.put("/websocket", 200); + this.testFilter.sendErrorMap.put("/xhr_streaming", 500); TestClientHandler handler = new TestClientHandler(); initSockJsClient(createWebSocketTransport(), createXhrTransport()); WebSocketSession session = this.sockJsClient.doHandshake(handler, this.baseUrl + "/echo").get(); @@ -229,8 +233,8 @@ public abstract class AbstractSockJsIntegrationTests { @Test(timeout = 5000) public void fallbackAfterConnectTimeout() throws Exception { TestClientHandler clientHandler = new TestClientHandler(); - this.errorFilter.sleepDelayMap.put("/xhr_streaming", 10000L); - this.errorFilter.responseStatusMap.put("/xhr_streaming", 503); + this.testFilter.sleepDelayMap.put("/xhr_streaming", 10000L); + this.testFilter.sendErrorMap.put("/xhr_streaming", 503); initSockJsClient(createXhrTransport()); this.sockJsClient.setConnectTimeoutScheduler(this.wac.getBean(ThreadPoolTaskScheduler.class)); WebSocketSession clientSession = sockJsClient.doHandshake(clientHandler, this.baseUrl + "/echo").get(); @@ -261,10 +265,12 @@ public abstract class AbstractSockJsIntegrationTests { session.close(); } - private void testReceiveOneMessage(Transport transport) throws Exception { + private void testReceiveOneMessage(Transport transport, WebSocketHttpHeaders headers) + throws Exception { + TestClientHandler clientHandler = new TestClientHandler(); initSockJsClient(transport); - this.sockJsClient.doHandshake(clientHandler, this.baseUrl + "/test").get(); + this.sockJsClient.doHandshake(clientHandler, headers, new URI(this.baseUrl + "/test")).get(); TestServerHandler serverHandler = this.wac.getBean(TestServerHandler.class); assertNotNull("afterConnectionEstablished should have been called", clientHandler.session); @@ -275,6 +281,22 @@ public abstract class AbstractSockJsIntegrationTests { clientHandler.awaitMessage(message, 5000); } + private static void awaitEvent(Supplier condition, long timeToWait, String description) { + long timeToSleep = 200; + for (int i = 0 ; i < Math.floor(timeToWait / timeToSleep); i++) { + if (condition.get()) { + return; + } + try { + Thread.sleep(timeToSleep); + } + catch (InterruptedException e) { + throw new IllegalStateException("Interrupted while waiting for " + description, e); + } + } + throw new IllegalStateException("Timed out waiting for " + description); + } + @Configuration @EnableWebSocket @@ -296,23 +318,6 @@ public abstract class AbstractSockJsIntegrationTests { } } - - private static void awaitEvent(Supplier condition, long timeToWait, String description) { - long timeToSleep = 200; - for (int i = 0 ; i < Math.floor(timeToWait / timeToSleep); i++) { - if (condition.get()) { - return; - } - try { - Thread.sleep(timeToSleep); - } - catch (InterruptedException e) { - throw new IllegalStateException("Interrupted while waiting for " + description, e); - } - } - throw new IllegalStateException("Timed out waiting for " + description); - } - private static class TestClientHandler extends TextWebSocketHandler { private final BlockingQueue receivedMessages = new LinkedBlockingQueue<>(); @@ -356,6 +361,14 @@ public abstract class AbstractSockJsIntegrationTests { } } + private static class EchoHandler extends TextWebSocketHandler { + + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { + session.sendMessage(message); + } + } + private static class TestServerHandler extends TextWebSocketHandler { private WebSocketSession session; @@ -371,24 +384,23 @@ public abstract class AbstractSockJsIntegrationTests { } } - private static class EchoHandler extends TextWebSocketHandler { + private static class TestFilter implements Filter { - @Override - protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { - session.sendMessage(message); - } - } - - private static class ErrorFilter implements Filter { - - private final Map responseStatusMap = new HashMap<>(); + private final List requests = new ArrayList<>(); private final Map sleepDelayMap = new HashMap<>(); + private final Map sendErrorMap = new HashMap<>(); + + @Override - public void doFilter(ServletRequest req, ServletResponse resp, FilterChain chain) throws IOException, ServletException { + public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) + throws IOException, ServletException { + + this.requests.add(request); + for (String suffix : this.sleepDelayMap.keySet()) { - if (((HttpServletRequest) req).getRequestURI().endsWith(suffix)) { + if (((HttpServletRequest) request).getRequestURI().endsWith(suffix)) { try { Thread.sleep(this.sleepDelayMap.get(suffix)); break; @@ -398,17 +410,17 @@ public abstract class AbstractSockJsIntegrationTests { } } } - for (String suffix : this.responseStatusMap.keySet()) { - if (((HttpServletRequest) req).getRequestURI().endsWith(suffix)) { - ((HttpServletResponse) resp).sendError(this.responseStatusMap.get(suffix)); + for (String suffix : this.sendErrorMap.keySet()) { + if (((HttpServletRequest) request).getRequestURI().endsWith(suffix)) { + ((HttpServletResponse) response).sendError(this.sendErrorMap.get(suffix)); return; } } - chain.doFilter(req, resp); + chain.doFilter(request, response); } @Override - public void init(FilterConfig filterConfig) throws ServletException { + public void init(FilterConfig filterConfig) { } @Override