Simplify WebSocket client implementationss in WebFlux

1. Eliminate WebSocketClientSupport base class whose main value was
to provide logging but those methods get in the way of inserting a
log prefix.

2. Remove checks and synchronization in lifecycle methods of Jetty
client since underlying Jetty client already has that.
This commit is contained in:
Rossen Stoyanchev 2018-07-05 21:28:29 -04:00
parent fd69c90fcb
commit 7be2943c03
5 changed files with 82 additions and 114 deletions

View File

@ -17,8 +17,9 @@
package org.springframework.web.reactive.socket.client; package org.springframework.web.reactive.socket.client;
import java.net.URI; import java.net.URI;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.jetty.websocket.api.UpgradeRequest; import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse; import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
@ -48,16 +49,15 @@ import org.springframework.web.reactive.socket.adapter.JettyWebSocketSession;
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 5.0 * @since 5.0
*/ */
public class JettyWebSocketClient extends WebSocketClientSupport implements WebSocketClient, Lifecycle { public class JettyWebSocketClient implements WebSocketClient, Lifecycle {
private static final Log logger = LogFactory.getLog(JettyWebSocketClient.class);
private final org.eclipse.jetty.websocket.client.WebSocketClient jettyClient; private final org.eclipse.jetty.websocket.client.WebSocketClient jettyClient;
private final boolean externallyManaged; private final boolean externallyManaged;
private volatile boolean running = false;
private final Object lifecycleMonitor = new Object();
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
@ -99,43 +99,31 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS
@Override @Override
public void start() { public void start() {
if (this.externallyManaged) { if (!this.externallyManaged) {
return; try {
} this.jettyClient.start();
synchronized (this.lifecycleMonitor) { }
if (!isRunning()) { catch (Exception ex) {
try { throw new IllegalStateException("Failed to start Jetty WebSocketClient", ex);
this.running = true;
this.jettyClient.start();
}
catch (Exception ex) {
throw new IllegalStateException("Failed to start Jetty WebSocketClient", ex);
}
} }
} }
} }
@Override @Override
public void stop() { public void stop() {
if (this.externallyManaged) { if (!this.externallyManaged) {
return; try {
} this.jettyClient.stop();
synchronized (this.lifecycleMonitor) { }
if (isRunning()) { catch (Exception ex) {
try { throw new IllegalStateException("Error stopping Jetty WebSocketClient", ex);
this.running = false;
this.jettyClient.stop();
}
catch (Exception ex) {
throw new IllegalStateException("Error stopping Jetty WebSocketClient", ex);
}
} }
} }
} }
@Override @Override
public boolean isRunning() { public boolean isRunning() {
return this.running; return this.jettyClient.isRunning();
} }
@ -153,23 +141,28 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS
MonoProcessor<Void> completionMono = MonoProcessor.create(); MonoProcessor<Void> completionMono = MonoProcessor.create();
return Mono.fromCallable( return Mono.fromCallable(
() -> { () -> {
List<String> protocols = beforeHandshake(url, headers, handler); if (logger.isDebugEnabled()) {
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); logger.debug("Connecting to " + url);
upgradeRequest.setSubProtocols(protocols); }
Object jettyHandler = createJettyHandler(url, handler, completionMono); Object jettyHandler = createHandler(url, handler, completionMono);
ClientUpgradeRequest request = new ClientUpgradeRequest();
request.setSubProtocols(handler.getSubProtocols());
UpgradeListener upgradeListener = new DefaultUpgradeListener(headers); UpgradeListener upgradeListener = new DefaultUpgradeListener(headers);
return this.jettyClient.connect(jettyHandler, url, upgradeRequest, upgradeListener); return this.jettyClient.connect(jettyHandler, url, request, upgradeListener);
}) })
.then(completionMono); .then(completionMono);
} }
private Object createJettyHandler(URI url, WebSocketHandler handler, MonoProcessor<Void> completion) { private Object createHandler(URI url, WebSocketHandler handler, MonoProcessor<Void> completion) {
return new JettyWebSocketHandlerAdapter(handler, return new JettyWebSocketHandlerAdapter(handler,
session -> { session -> {
UpgradeResponse response = session.getUpgradeResponse(); if (logger.isDebugEnabled()) {
logger.debug("Connected to " + url);
}
HttpHeaders responseHeaders = new HttpHeaders(); HttpHeaders responseHeaders = new HttpHeaders();
response.getHeaders().forEach(responseHeaders::put); session.getUpgradeResponse().getHeaders().forEach(responseHeaders::put);
HandshakeInfo info = afterHandshake(url, responseHeaders); String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol");
HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
return new JettyWebSocketSession(session, info, this.bufferFactory, completion); return new JettyWebSocketSession(session, info, this.bufferFactory, completion);
}); });
} }

View File

@ -16,8 +16,9 @@
package org.springframework.web.reactive.socket.client; package org.springframework.web.reactive.socket.client;
import java.net.URI; import java.net.URI;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClient;
import reactor.netty.http.websocket.WebsocketInbound; import reactor.netty.http.websocket.WebsocketInbound;
@ -36,7 +37,10 @@ import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSess
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 5.0 * @since 5.0
*/ */
public class ReactorNettyWebSocketClient extends WebSocketClientSupport implements WebSocketClient { public class ReactorNettyWebSocketClient implements WebSocketClient {
private static final Log logger = LogFactory.getLog(ReactorNettyWebSocketClient.class);
private final HttpClient httpClient; private final HttpClient httpClient;
@ -71,15 +75,21 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen
} }
@Override @Override
public Mono<Void> execute(URI url, HttpHeaders httpHeaders, WebSocketHandler handler) { public Mono<Void> execute(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
List<String> protocols = beforeHandshake(url, httpHeaders, handler); if (logger.isDebugEnabled()) {
logger.debug("Connecting to " + url);
}
return getHttpClient() return getHttpClient()
.headers(nettyHeaders -> setNettyHeaders(httpHeaders, nettyHeaders)) .headers(nettyHeaders -> setNettyHeaders(requestHeaders, nettyHeaders))
.websocket(StringUtils.collectionToCommaDelimitedString(protocols)) .websocket(StringUtils.collectionToCommaDelimitedString(handler.getSubProtocols()))
.uri(url.toString()) .uri(url.toString())
.handle((inbound, outbound) -> { .handle((inbound, outbound) -> {
HandshakeInfo info = afterHandshake(url, toHttpHeaders(inbound)); if (logger.isDebugEnabled()) {
logger.debug("Connected to " + url);
}
HttpHeaders responseHeaders = toHttpHeaders(inbound);
String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol");
HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
NettyDataBufferFactory factory = new NettyDataBufferFactory(outbound.alloc()); NettyDataBufferFactory factory = new NettyDataBufferFactory(outbound.alloc());
WebSocketSession session = new ReactorNettyWebSocketSession(inbound, outbound, info, factory); WebSocketSession session = new ReactorNettyWebSocketSession(inbound, outbound, info, factory);
return handler.handle(session); return handler.handle(session);

View File

@ -27,6 +27,8 @@ import javax.websocket.HandshakeResponse;
import javax.websocket.Session; import javax.websocket.Session;
import javax.websocket.WebSocketContainer; import javax.websocket.WebSocketContainer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
@ -47,7 +49,10 @@ import org.springframework.web.reactive.socket.adapter.StandardWebSocketSession;
* @since 5.0 * @since 5.0
* @see <a href="https://www.jcp.org/en/jsr/detail?id=356">https://www.jcp.org/en/jsr/detail?id=356</a> * @see <a href="https://www.jcp.org/en/jsr/detail?id=356">https://www.jcp.org/en/jsr/detail?id=356</a>
*/ */
public class StandardWebSocketClient extends WebSocketClientSupport implements WebSocketClient { public class StandardWebSocketClient implements WebSocketClient {
private static final Log logger = LogFactory.getLog(StandardWebSocketClient.class);
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
@ -94,7 +99,10 @@ public class StandardWebSocketClient extends WebSocketClientSupport implements W
MonoProcessor<Void> completionMono = MonoProcessor.create(); MonoProcessor<Void> completionMono = MonoProcessor.create();
return Mono.fromCallable( return Mono.fromCallable(
() -> { () -> {
List<String> protocols = beforeHandshake(url, requestHeaders, handler); if (logger.isDebugEnabled()) {
logger.debug("Connecting to " + url);
}
List<String> protocols = handler.getSubProtocols();
DefaultConfigurator configurator = new DefaultConfigurator(requestHeaders); DefaultConfigurator configurator = new DefaultConfigurator(requestHeaders);
Endpoint endpoint = createEndpoint(url, handler, completionMono, configurator); Endpoint endpoint = createEndpoint(url, handler, completionMono, configurator);
ClientEndpointConfig config = createEndpointConfig(configurator, protocols); ClientEndpointConfig config = createEndpointConfig(configurator, protocols);
@ -108,8 +116,12 @@ public class StandardWebSocketClient extends WebSocketClientSupport implements W
MonoProcessor<Void> completion, DefaultConfigurator configurator) { MonoProcessor<Void> completion, DefaultConfigurator configurator) {
return new StandardWebSocketHandlerAdapter(handler, session -> { return new StandardWebSocketHandlerAdapter(handler, session -> {
if (logger.isDebugEnabled()) {
logger.debug("Connected to " + url);
}
HttpHeaders responseHeaders = configurator.getResponseHeaders(); HttpHeaders responseHeaders = configurator.getResponseHeaders();
HandshakeInfo info = afterHandshake(url, responseHeaders); String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol");
HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
return createWebSocketSession(session, info, completion); return createWebSocketSession(session, info, completion);
}); });
} }

View File

@ -28,6 +28,8 @@ import io.undertow.server.DefaultByteBufferPool;
import io.undertow.websockets.client.WebSocketClient.ConnectionBuilder; import io.undertow.websockets.client.WebSocketClient.ConnectionBuilder;
import io.undertow.websockets.client.WebSocketClientNegotiation; import io.undertow.websockets.client.WebSocketClientNegotiation;
import io.undertow.websockets.core.WebSocketChannel; import io.undertow.websockets.core.WebSocketChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.xnio.IoFuture; import org.xnio.IoFuture;
import org.xnio.XnioWorker; import org.xnio.XnioWorker;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -50,7 +52,9 @@ import org.springframework.web.reactive.socket.adapter.UndertowWebSocketSession;
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 5.0 * @since 5.0
*/ */
public class UndertowWebSocketClient extends WebSocketClientSupport implements WebSocketClient { public class UndertowWebSocketClient implements WebSocketClient {
private static final Log logger = LogFactory.getLog(UndertowWebSocketClient.class);
private static final int DEFAULT_POOL_BUFFER_SIZE = 8192; private static final int DEFAULT_POOL_BUFFER_SIZE = 8192;
@ -153,8 +157,11 @@ public class UndertowWebSocketClient extends WebSocketClientSupport implements W
MonoProcessor<Void> completion = MonoProcessor.create(); MonoProcessor<Void> completion = MonoProcessor.create();
return Mono.fromCallable( return Mono.fromCallable(
() -> { () -> {
if (logger.isDebugEnabled()) {
logger.debug("Connecting to " + url);
}
List<String> protocols = handler.getSubProtocols();
ConnectionBuilder builder = createConnectionBuilder(url); ConnectionBuilder builder = createConnectionBuilder(url);
List<String> protocols = beforeHandshake(url, headers, handler);
DefaultNegotiation negotiation = new DefaultNegotiation(protocols, headers, builder); DefaultNegotiation negotiation = new DefaultNegotiation(protocols, headers, builder);
builder.setClientNegotiation(negotiation); builder.setClientNegotiation(negotiation);
return builder.connect().addNotifier( return builder.connect().addNotifier(
@ -165,7 +172,7 @@ public class UndertowWebSocketClient extends WebSocketClientSupport implements W
} }
@Override @Override
public void handleFailed(IOException ex, Object attachment) { public void handleFailed(IOException ex, Object attachment) {
completion.onError(new IllegalStateException("Failed to connect", ex)); completion.onError(new IllegalStateException("Failed to connect to " + url, ex));
} }
}, null); }, null);
}) })
@ -189,7 +196,12 @@ public class UndertowWebSocketClient extends WebSocketClientSupport implements W
private void handleChannel(URI url, WebSocketHandler handler, MonoProcessor<Void> completion, private void handleChannel(URI url, WebSocketHandler handler, MonoProcessor<Void> completion,
DefaultNegotiation negotiation, WebSocketChannel channel) { DefaultNegotiation negotiation, WebSocketChannel channel) {
HandshakeInfo info = afterHandshake(url, negotiation.getResponseHeaders()); if (logger.isDebugEnabled()) {
logger.debug("Connected to " + url);
}
HttpHeaders responseHeaders = negotiation.getResponseHeaders();
String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol");
HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
UndertowWebSocketSession session = new UndertowWebSocketSession(channel, info, this.bufferFactory, completion); UndertowWebSocketSession session = new UndertowWebSocketSession(channel, info, this.bufferFactory, completion);
UndertowWebSocketHandlerAdapter adapter = new UndertowWebSocketHandlerAdapter(session); UndertowWebSocketHandlerAdapter adapter = new UndertowWebSocketHandlerAdapter(session);

View File

@ -1,59 +0,0 @@
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.client;
import java.net.URI;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;
import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
/**
* Base class for {@link WebSocketClient} implementations.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public class WebSocketClientSupport {
private static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";
protected final Log logger = LogFactory.getLog(getClass());
protected List<String> beforeHandshake(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
if (logger.isDebugEnabled()) {
logger.debug("Connecting to " + url);
}
return handler.getSubProtocols();
}
protected HandshakeInfo afterHandshake(URI url, HttpHeaders responseHeaders) {
if (logger.isDebugEnabled()) {
logger.debug("Connected to " + url + ", " + responseHeaders);
}
String protocol = responseHeaders.getFirst(SEC_WEBSOCKET_PROTOCOL);
return new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
}
}