From 9552c82e2d81bcb6c781793e1fe14708bd4db25b Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 19 Mar 2014 10:24:48 -0400 Subject: [PATCH] Remove use of simple executor in WebSocket clients Issue: SPR-11580 --- .../client/jetty/JettyWebSocketClient.java | 31 +++++++++++------ .../standard/StandardWebSocketClient.java | 34 +++++++++++++------ .../jetty/JettyWebSocketClientTests.java | 14 ++++++++ .../StandardWebSocketClientTests.java | 15 +++++++- 4 files changed, 71 insertions(+), 23 deletions(-) diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/client/jetty/JettyWebSocketClient.java b/spring-websocket/src/main/java/org/springframework/web/socket/client/jetty/JettyWebSocketClient.java index 5d83118e32d..29e27c938df 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/client/jetty/JettyWebSocketClient.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/client/jetty/JettyWebSocketClient.java @@ -28,11 +28,10 @@ import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.WebSocketClient; import org.springframework.context.SmartLifecycle; import org.springframework.core.task.AsyncListenableTaskExecutor; -import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.core.task.TaskExecutor; import org.springframework.http.HttpHeaders; -import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureTask; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.adapter.jetty.JettyWebSocketHandlerAdapter; @@ -60,32 +59,33 @@ public class JettyWebSocketClient extends AbstractWebSocketClient implements Sma private final Object lifecycleMonitor = new Object(); - private AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("WebSocketClient-"); + private AsyncListenableTaskExecutor taskExecutor; /** * Default constructor that creates an instance of - * {@link org.eclipse.jetty.websocket.client.WebSocketClient} with default settings. + * {@link org.eclipse.jetty.websocket.client.WebSocketClient}. */ public JettyWebSocketClient() { this.client = new org.eclipse.jetty.websocket.client.WebSocketClient(); } /** - * Constructor that accepts a pre-configured {@link WebSocketClient}. + * Constructor that accepts an existing + * {@link org.eclipse.jetty.websocket.client.WebSocketClient} instance. */ public JettyWebSocketClient(WebSocketClient client) { - super(); this.client = client; } /** - * Set a {@link TaskExecutor} to use to open the connection. - * By default {@link SimpleAsyncTaskExecutor} is used. + * Set an {@link AsyncListenableTaskExecutor} to use when opening connections. + * + *

If this property is not configured, calls to any of the + * {@code doHandshake} methods will block until the connection is established. */ public void setTaskExecutor(AsyncListenableTaskExecutor taskExecutor) { - Assert.notNull(taskExecutor, "TaskExecutor must not be null"); this.taskExecutor = taskExecutor; } @@ -189,14 +189,23 @@ public class JettyWebSocketClient extends AbstractWebSocketClient implements Sma final JettyWebSocketSession wsSession = new JettyWebSocketSession(handshakeAttributes, user); final JettyWebSocketHandlerAdapter listener = new JettyWebSocketHandlerAdapter(wsHandler, wsSession); - return this.taskExecutor.submitListenable(new Callable() { + Callable connectTask = new Callable() { @Override public WebSocketSession call() throws Exception { Future future = client.connect(listener, uri, request); future.get(); return wsSession; } - }); + }; + + if (this.taskExecutor != null) { + return this.taskExecutor.submitListenable(connectTask); + } + else { + ListenableFutureTask task = new ListenableFutureTask(connectTask); + task.run(); + return task; + } } /** diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/client/standard/StandardWebSocketClient.java b/spring-websocket/src/main/java/org/springframework/web/socket/client/standard/StandardWebSocketClient.java index 0eeabbf03a9..c221e40715f 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/client/standard/StandardWebSocketClient.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/client/standard/StandardWebSocketClient.java @@ -34,11 +34,11 @@ import javax.websocket.HandshakeResponse; import javax.websocket.WebSocketContainer; import org.springframework.core.task.AsyncListenableTaskExecutor; -import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.core.task.TaskExecutor; import org.springframework.http.HttpHeaders; import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureTask; import org.springframework.web.socket.WebSocketExtension; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketSession; @@ -58,22 +58,24 @@ public class StandardWebSocketClient extends AbstractWebSocketClient { private final WebSocketContainer webSocketContainer; - private AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("WebSocketClient-"); + private AsyncListenableTaskExecutor taskExecutor; /** * Default constructor that calls {@code ContainerProvider.getWebSocketContainer()} - * to obtain a {@link WebSocketContainer} instance. + * to obtain a (new) {@link WebSocketContainer} instance. Also see constructor + * accepting existing {@code WebSocketContainer} instance. */ public StandardWebSocketClient() { this.webSocketContainer = ContainerProvider.getWebSocketContainer(); } /** - * Constructor that accepts a pre-configured {@link WebSocketContainer} instance. - * If using XML configuration see {@link WebSocketContainerFactoryBean}. In Java + * Constructor accepting an existing {@link WebSocketContainer} instance. + * + *

For XML configuration see {@link WebSocketContainerFactoryBean}. For Java * configuration use {@code ContainerProvider.getWebSocketContainer()} to obtain - * a container instance. + * the {@code WebSocketContainer} instance. */ public StandardWebSocketClient(WebSocketContainer webSocketContainer) { Assert.notNull(webSocketContainer, "WebSocketContainer must not be null"); @@ -82,11 +84,12 @@ public class StandardWebSocketClient extends AbstractWebSocketClient { /** - * Set a {@link TaskExecutor} to use to open the connection. - * By default {@link SimpleAsyncTaskExecutor} is used. + * Set an {@link AsyncListenableTaskExecutor} to use when opening connections. + * + *

If this property is not configured, calls to any of the + * {@code doHandshake} methods will block until the connection is established. */ public void setTaskExecutor(AsyncListenableTaskExecutor taskExecutor) { - Assert.notNull(taskExecutor, "TaskExecutor must not be null"); this.taskExecutor = taskExecutor; } @@ -116,13 +119,22 @@ public class StandardWebSocketClient extends AbstractWebSocketClient { configBuidler.extensions(adaptExtensions(extensions)); final Endpoint endpoint = new StandardWebSocketHandlerAdapter(webSocketHandler, session); - return this.taskExecutor.submitListenable(new Callable() { + Callable connectTask = new Callable() { @Override public WebSocketSession call() throws Exception { webSocketContainer.connectToServer(endpoint, configBuidler.build(), uri); return session; } - }); + }; + + if (this.taskExecutor != null) { + return this.taskExecutor.submitListenable(connectTask); + } + else { + ListenableFutureTask task = new ListenableFutureTask(connectTask); + task.run(); + return task; + } } private static List adaptExtensions(List extensions) { diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/client/jetty/JettyWebSocketClientTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/client/jetty/JettyWebSocketClientTests.java index 87d437d32ea..3335dda08e2 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/client/jetty/JettyWebSocketClientTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/client/jetty/JettyWebSocketClientTests.java @@ -28,6 +28,7 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.util.CollectionUtils; import org.springframework.util.SocketUtils; import org.springframework.web.socket.WebSocketHandler; @@ -88,6 +89,19 @@ public class JettyWebSocketClientTests { assertEquals("echo", this.wsSession.getAcceptedProtocol()); } + @Test + public void doHandshakeWithTaskExecutor() throws Exception { + + WebSocketHttpHeaders headers = new WebSocketHttpHeaders(); + headers.setSecWebSocketProtocol(Arrays.asList("echo")); + + this.client.setTaskExecutor(new SimpleAsyncTaskExecutor()); + this.wsSession = this.client.doHandshake(new TextWebSocketHandler(), headers, new URI(this.wsUrl)).get(); + + assertEquals(this.wsUrl, this.wsSession.getUri().toString()); + assertEquals("echo", this.wsSession.getAcceptedProtocol()); + } + private static class TestJettyWebSocketServer { diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/client/standard/StandardWebSocketClientTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/client/standard/StandardWebSocketClientTests.java index c57ec07f277..115d3920108 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/client/standard/StandardWebSocketClientTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/client/standard/StandardWebSocketClientTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,9 @@ import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.springframework.core.task.AsyncListenableTaskExecutor; +import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.core.task.TaskExecutor; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.AbstractWebSocketHandler; @@ -134,4 +137,14 @@ public class StandardWebSocketClientTests { assertEquals(Collections.singletonMap("foo", Arrays.asList("bar")), map); } + @Test + public void taskExecutor() throws Exception { + + URI uri = new URI("ws://example.com/abc"); + this.wsClient.setTaskExecutor(new SimpleAsyncTaskExecutor()); + WebSocketSession session = this.wsClient.doHandshake(this.wsHandler, this.headers, uri).get(); + + assertNotNull(session); + } + }