diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/client/jetty/JettyWebSocketClient.java b/spring-websocket/src/main/java/org/springframework/web/socket/client/jetty/JettyWebSocketClient.java
index 5d83118e32d..29e27c938df 100644
--- a/spring-websocket/src/main/java/org/springframework/web/socket/client/jetty/JettyWebSocketClient.java
+++ b/spring-websocket/src/main/java/org/springframework/web/socket/client/jetty/JettyWebSocketClient.java
@@ -28,11 +28,10 @@ import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.AsyncListenableTaskExecutor;
-import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.http.HttpHeaders;
-import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureTask;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.adapter.jetty.JettyWebSocketHandlerAdapter;
@@ -60,32 +59,33 @@ public class JettyWebSocketClient extends AbstractWebSocketClient implements Sma
private final Object lifecycleMonitor = new Object();
- private AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("WebSocketClient-");
+ private AsyncListenableTaskExecutor taskExecutor;
/**
* Default constructor that creates an instance of
- * {@link org.eclipse.jetty.websocket.client.WebSocketClient} with default settings.
+ * {@link org.eclipse.jetty.websocket.client.WebSocketClient}.
*/
public JettyWebSocketClient() {
this.client = new org.eclipse.jetty.websocket.client.WebSocketClient();
}
/**
- * Constructor that accepts a pre-configured {@link WebSocketClient}.
+ * Constructor that accepts an existing
+ * {@link org.eclipse.jetty.websocket.client.WebSocketClient} instance.
*/
public JettyWebSocketClient(WebSocketClient client) {
- super();
this.client = client;
}
/**
- * Set a {@link TaskExecutor} to use to open the connection.
- * By default {@link SimpleAsyncTaskExecutor} is used.
+ * Set an {@link AsyncListenableTaskExecutor} to use when opening connections.
+ *
+ *
If this property is not configured, calls to any of the
+ * {@code doHandshake} methods will block until the connection is established.
*/
public void setTaskExecutor(AsyncListenableTaskExecutor taskExecutor) {
- Assert.notNull(taskExecutor, "TaskExecutor must not be null");
this.taskExecutor = taskExecutor;
}
@@ -189,14 +189,23 @@ public class JettyWebSocketClient extends AbstractWebSocketClient implements Sma
final JettyWebSocketSession wsSession = new JettyWebSocketSession(handshakeAttributes, user);
final JettyWebSocketHandlerAdapter listener = new JettyWebSocketHandlerAdapter(wsHandler, wsSession);
- return this.taskExecutor.submitListenable(new Callable() {
+ Callable connectTask = new Callable() {
@Override
public WebSocketSession call() throws Exception {
Future future = client.connect(listener, uri, request);
future.get();
return wsSession;
}
- });
+ };
+
+ if (this.taskExecutor != null) {
+ return this.taskExecutor.submitListenable(connectTask);
+ }
+ else {
+ ListenableFutureTask task = new ListenableFutureTask(connectTask);
+ task.run();
+ return task;
+ }
}
/**
diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/client/standard/StandardWebSocketClient.java b/spring-websocket/src/main/java/org/springframework/web/socket/client/standard/StandardWebSocketClient.java
index 0eeabbf03a9..c221e40715f 100644
--- a/spring-websocket/src/main/java/org/springframework/web/socket/client/standard/StandardWebSocketClient.java
+++ b/spring-websocket/src/main/java/org/springframework/web/socket/client/standard/StandardWebSocketClient.java
@@ -34,11 +34,11 @@ import javax.websocket.HandshakeResponse;
import javax.websocket.WebSocketContainer;
import org.springframework.core.task.AsyncListenableTaskExecutor;
-import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.http.HttpHeaders;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureTask;
import org.springframework.web.socket.WebSocketExtension;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
@@ -58,22 +58,24 @@ public class StandardWebSocketClient extends AbstractWebSocketClient {
private final WebSocketContainer webSocketContainer;
- private AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("WebSocketClient-");
+ private AsyncListenableTaskExecutor taskExecutor;
/**
* Default constructor that calls {@code ContainerProvider.getWebSocketContainer()}
- * to obtain a {@link WebSocketContainer} instance.
+ * to obtain a (new) {@link WebSocketContainer} instance. Also see constructor
+ * accepting existing {@code WebSocketContainer} instance.
*/
public StandardWebSocketClient() {
this.webSocketContainer = ContainerProvider.getWebSocketContainer();
}
/**
- * Constructor that accepts a pre-configured {@link WebSocketContainer} instance.
- * If using XML configuration see {@link WebSocketContainerFactoryBean}. In Java
+ * Constructor accepting an existing {@link WebSocketContainer} instance.
+ *
+ * For XML configuration see {@link WebSocketContainerFactoryBean}. For Java
* configuration use {@code ContainerProvider.getWebSocketContainer()} to obtain
- * a container instance.
+ * the {@code WebSocketContainer} instance.
*/
public StandardWebSocketClient(WebSocketContainer webSocketContainer) {
Assert.notNull(webSocketContainer, "WebSocketContainer must not be null");
@@ -82,11 +84,12 @@ public class StandardWebSocketClient extends AbstractWebSocketClient {
/**
- * Set a {@link TaskExecutor} to use to open the connection.
- * By default {@link SimpleAsyncTaskExecutor} is used.
+ * Set an {@link AsyncListenableTaskExecutor} to use when opening connections.
+ *
+ *
If this property is not configured, calls to any of the
+ * {@code doHandshake} methods will block until the connection is established.
*/
public void setTaskExecutor(AsyncListenableTaskExecutor taskExecutor) {
- Assert.notNull(taskExecutor, "TaskExecutor must not be null");
this.taskExecutor = taskExecutor;
}
@@ -116,13 +119,22 @@ public class StandardWebSocketClient extends AbstractWebSocketClient {
configBuidler.extensions(adaptExtensions(extensions));
final Endpoint endpoint = new StandardWebSocketHandlerAdapter(webSocketHandler, session);
- return this.taskExecutor.submitListenable(new Callable() {
+ Callable connectTask = new Callable() {
@Override
public WebSocketSession call() throws Exception {
webSocketContainer.connectToServer(endpoint, configBuidler.build(), uri);
return session;
}
- });
+ };
+
+ if (this.taskExecutor != null) {
+ return this.taskExecutor.submitListenable(connectTask);
+ }
+ else {
+ ListenableFutureTask task = new ListenableFutureTask(connectTask);
+ task.run();
+ return task;
+ }
}
private static List adaptExtensions(List extensions) {
diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/client/jetty/JettyWebSocketClientTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/client/jetty/JettyWebSocketClientTests.java
index 87d437d32ea..3335dda08e2 100644
--- a/spring-websocket/src/test/java/org/springframework/web/socket/client/jetty/JettyWebSocketClientTests.java
+++ b/spring-websocket/src/test/java/org/springframework/web/socket/client/jetty/JettyWebSocketClientTests.java
@@ -28,6 +28,7 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.util.CollectionUtils;
import org.springframework.util.SocketUtils;
import org.springframework.web.socket.WebSocketHandler;
@@ -88,6 +89,19 @@ public class JettyWebSocketClientTests {
assertEquals("echo", this.wsSession.getAcceptedProtocol());
}
+ @Test
+ public void doHandshakeWithTaskExecutor() throws Exception {
+
+ WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
+ headers.setSecWebSocketProtocol(Arrays.asList("echo"));
+
+ this.client.setTaskExecutor(new SimpleAsyncTaskExecutor());
+ this.wsSession = this.client.doHandshake(new TextWebSocketHandler(), headers, new URI(this.wsUrl)).get();
+
+ assertEquals(this.wsUrl, this.wsSession.getUri().toString());
+ assertEquals("echo", this.wsSession.getAcceptedProtocol());
+ }
+
private static class TestJettyWebSocketServer {
diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/client/standard/StandardWebSocketClientTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/client/standard/StandardWebSocketClientTests.java
index c57ec07f277..115d3920108 100644
--- a/spring-websocket/src/test/java/org/springframework/web/socket/client/standard/StandardWebSocketClientTests.java
+++ b/spring-websocket/src/test/java/org/springframework/web/socket/client/standard/StandardWebSocketClientTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2013 the original author or authors.
+ * Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,6 +30,9 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.springframework.core.task.AsyncListenableTaskExecutor;
+import org.springframework.core.task.SimpleAsyncTaskExecutor;
+import org.springframework.core.task.TaskExecutor;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
@@ -134,4 +137,14 @@ public class StandardWebSocketClientTests {
assertEquals(Collections.singletonMap("foo", Arrays.asList("bar")), map);
}
+ @Test
+ public void taskExecutor() throws Exception {
+
+ URI uri = new URI("ws://example.com/abc");
+ this.wsClient.setTaskExecutor(new SimpleAsyncTaskExecutor());
+ WebSocketSession session = this.wsClient.doHandshake(this.wsHandler, this.headers, uri).get();
+
+ assertNotNull(session);
+ }
+
}