Remove use of simple executor in WebSocket clients

Issue: SPR-11580
This commit is contained in:
Rossen Stoyanchev 2014-03-19 10:24:48 -04:00
parent 8aefcb9a55
commit 9552c82e2d
4 changed files with 71 additions and 23 deletions

View File

@ -28,11 +28,10 @@ import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.http.HttpHeaders;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureTask;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.adapter.jetty.JettyWebSocketHandlerAdapter;
@ -60,32 +59,33 @@ public class JettyWebSocketClient extends AbstractWebSocketClient implements Sma
private final Object lifecycleMonitor = new Object();
private AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("WebSocketClient-");
private AsyncListenableTaskExecutor taskExecutor;
/**
* Default constructor that creates an instance of
* {@link org.eclipse.jetty.websocket.client.WebSocketClient} with default settings.
* {@link org.eclipse.jetty.websocket.client.WebSocketClient}.
*/
public JettyWebSocketClient() {
this.client = new org.eclipse.jetty.websocket.client.WebSocketClient();
}
/**
* Constructor that accepts a pre-configured {@link WebSocketClient}.
* Constructor that accepts an existing
* {@link org.eclipse.jetty.websocket.client.WebSocketClient} instance.
*/
public JettyWebSocketClient(WebSocketClient client) {
super();
this.client = client;
}
/**
* Set a {@link TaskExecutor} to use to open the connection.
* By default {@link SimpleAsyncTaskExecutor} is used.
* Set an {@link AsyncListenableTaskExecutor} to use when opening connections.
*
* <p>If this property is not configured, calls to any of the
* {@code doHandshake} methods will block until the connection is established.
*/
public void setTaskExecutor(AsyncListenableTaskExecutor taskExecutor) {
Assert.notNull(taskExecutor, "TaskExecutor must not be null");
this.taskExecutor = taskExecutor;
}
@ -189,14 +189,23 @@ public class JettyWebSocketClient extends AbstractWebSocketClient implements Sma
final JettyWebSocketSession wsSession = new JettyWebSocketSession(handshakeAttributes, user);
final JettyWebSocketHandlerAdapter listener = new JettyWebSocketHandlerAdapter(wsHandler, wsSession);
return this.taskExecutor.submitListenable(new Callable<WebSocketSession>() {
Callable<WebSocketSession> connectTask = new Callable<WebSocketSession>() {
@Override
public WebSocketSession call() throws Exception {
Future<Session> future = client.connect(listener, uri, request);
future.get();
return wsSession;
}
});
};
if (this.taskExecutor != null) {
return this.taskExecutor.submitListenable(connectTask);
}
else {
ListenableFutureTask<WebSocketSession> task = new ListenableFutureTask<WebSocketSession>(connectTask);
task.run();
return task;
}
}
/**

View File

@ -34,11 +34,11 @@ import javax.websocket.HandshakeResponse;
import javax.websocket.WebSocketContainer;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.http.HttpHeaders;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureTask;
import org.springframework.web.socket.WebSocketExtension;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
@ -58,22 +58,24 @@ public class StandardWebSocketClient extends AbstractWebSocketClient {
private final WebSocketContainer webSocketContainer;
private AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("WebSocketClient-");
private AsyncListenableTaskExecutor taskExecutor;
/**
* Default constructor that calls {@code ContainerProvider.getWebSocketContainer()}
* to obtain a {@link WebSocketContainer} instance.
* to obtain a (new) {@link WebSocketContainer} instance. Also see constructor
* accepting existing {@code WebSocketContainer} instance.
*/
public StandardWebSocketClient() {
this.webSocketContainer = ContainerProvider.getWebSocketContainer();
}
/**
* Constructor that accepts a pre-configured {@link WebSocketContainer} instance.
* If using XML configuration see {@link WebSocketContainerFactoryBean}. In Java
* Constructor accepting an existing {@link WebSocketContainer} instance.
*
* <p>For XML configuration see {@link WebSocketContainerFactoryBean}. For Java
* configuration use {@code ContainerProvider.getWebSocketContainer()} to obtain
* a container instance.
* the {@code WebSocketContainer} instance.
*/
public StandardWebSocketClient(WebSocketContainer webSocketContainer) {
Assert.notNull(webSocketContainer, "WebSocketContainer must not be null");
@ -82,11 +84,12 @@ public class StandardWebSocketClient extends AbstractWebSocketClient {
/**
* Set a {@link TaskExecutor} to use to open the connection.
* By default {@link SimpleAsyncTaskExecutor} is used.
* Set an {@link AsyncListenableTaskExecutor} to use when opening connections.
*
* <p>If this property is not configured, calls to any of the
* {@code doHandshake} methods will block until the connection is established.
*/
public void setTaskExecutor(AsyncListenableTaskExecutor taskExecutor) {
Assert.notNull(taskExecutor, "TaskExecutor must not be null");
this.taskExecutor = taskExecutor;
}
@ -116,13 +119,22 @@ public class StandardWebSocketClient extends AbstractWebSocketClient {
configBuidler.extensions(adaptExtensions(extensions));
final Endpoint endpoint = new StandardWebSocketHandlerAdapter(webSocketHandler, session);
return this.taskExecutor.submitListenable(new Callable<WebSocketSession>() {
Callable<WebSocketSession> connectTask = new Callable<WebSocketSession>() {
@Override
public WebSocketSession call() throws Exception {
webSocketContainer.connectToServer(endpoint, configBuidler.build(), uri);
return session;
}
});
};
if (this.taskExecutor != null) {
return this.taskExecutor.submitListenable(connectTask);
}
else {
ListenableFutureTask<WebSocketSession> task = new ListenableFutureTask<WebSocketSession>(connectTask);
task.run();
return task;
}
}
private static List<Extension> adaptExtensions(List<WebSocketExtension> extensions) {

View File

@ -28,6 +28,7 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.util.CollectionUtils;
import org.springframework.util.SocketUtils;
import org.springframework.web.socket.WebSocketHandler;
@ -88,6 +89,19 @@ public class JettyWebSocketClientTests {
assertEquals("echo", this.wsSession.getAcceptedProtocol());
}
@Test
public void doHandshakeWithTaskExecutor() throws Exception {
WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
headers.setSecWebSocketProtocol(Arrays.asList("echo"));
this.client.setTaskExecutor(new SimpleAsyncTaskExecutor());
this.wsSession = this.client.doHandshake(new TextWebSocketHandler(), headers, new URI(this.wsUrl)).get();
assertEquals(this.wsUrl, this.wsSession.getUri().toString());
assertEquals("echo", this.wsSession.getAcceptedProtocol());
}
private static class TestJettyWebSocketServer {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -30,6 +30,9 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
@ -134,4 +137,14 @@ public class StandardWebSocketClientTests {
assertEquals(Collections.singletonMap("foo", Arrays.asList("bar")), map);
}
@Test
public void taskExecutor() throws Exception {
URI uri = new URI("ws://example.com/abc");
this.wsClient.setTaskExecutor(new SimpleAsyncTaskExecutor());
WebSocketSession session = this.wsClient.doHandshake(this.wsHandler, this.headers, uri).get();
assertNotNull(session);
}
}