Support for Jetty 10

Closes gh-26123
This commit is contained in:
Rossen Stoyanchev 2021-01-21 20:16:21 +00:00
parent e537844a09
commit aa7584d252
11 changed files with 1006 additions and 72 deletions

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2020 the original author or authors. * Copyright 2002-2021 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -16,11 +16,13 @@
package org.springframework.http.client.reactive; package org.springframework.http.client.reactive;
import java.lang.reflect.Method;
import java.net.HttpCookie; import java.net.HttpCookie;
import java.util.List; import java.util.List;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.reactive.client.ReactiveResponse; import org.eclipse.jetty.reactive.client.ReactiveResponse;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
@ -30,9 +32,11 @@ import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie; import org.springframework.http.ResponseCookie;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap; import org.springframework.util.MultiValueMap;
import org.springframework.util.ReflectionUtils;
/** /**
* {@link ClientHttpResponse} implementation for the Jetty ReactiveStreams HTTP client. * {@link ClientHttpResponse} implementation for the Jetty ReactiveStreams HTTP client.
@ -46,6 +50,11 @@ class JettyClientHttpResponse implements ClientHttpResponse {
private static final Pattern SAMESITE_PATTERN = Pattern.compile("(?i).*SameSite=(Strict|Lax|None).*"); private static final Pattern SAMESITE_PATTERN = Pattern.compile("(?i).*SameSite=(Strict|Lax|None).*");
private static final ClassLoader loader = JettyClientHttpResponse.class.getClassLoader();
private static final boolean jetty10Present = ClassUtils.isPresent(
"org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", loader);
private final ReactiveResponse reactiveResponse; private final ReactiveResponse reactiveResponse;
@ -58,8 +67,11 @@ class JettyClientHttpResponse implements ClientHttpResponse {
this.reactiveResponse = reactiveResponse; this.reactiveResponse = reactiveResponse;
this.content = Flux.from(content); this.content = Flux.from(content);
MultiValueMap<String, String> adapter = new JettyHeadersAdapter(reactiveResponse.getHeaders()); MultiValueMap<String, String> headers = (jetty10Present ?
this.headers = HttpHeaders.readOnlyHttpHeaders(adapter); Jetty10HttpFieldsHelper.getHttpHeaders(reactiveResponse) :
new JettyHeadersAdapter(reactiveResponse.getHeaders()));
this.headers = HttpHeaders.readOnlyHttpHeaders(headers);
} }
@ -110,4 +122,38 @@ class JettyClientHttpResponse implements ClientHttpResponse {
return this.headers; return this.headers;
} }
private static class Jetty10HttpFieldsHelper {
private static final Method getHeadersMethod;
private static final Method getNameMethod;
private static final Method getValueMethod;
static {
try {
getHeadersMethod = Response.class.getMethod("getHeaders");
Class<?> type = loader.loadClass("org.eclipse.jetty.http.HttpField");
getNameMethod = type.getMethod("getName");
getValueMethod = type.getMethod("getValue");
}
catch (ClassNotFoundException | NoSuchMethodException ex) {
throw new IllegalStateException("No compatible Jetty version found", ex);
}
}
public static HttpHeaders getHttpHeaders(ReactiveResponse response) {
HttpHeaders headers = new HttpHeaders();
Iterable<?> iterator = (Iterable<?>)
ReflectionUtils.invokeMethod(getHeadersMethod, response.getResponse());
for (Object field : iterator) {
headers.add(
(String) ReflectionUtils.invokeMethod(getNameMethod, field),
(String) ReflectionUtils.invokeMethod(getValueMethod, field));
}
return headers;
}
}
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2020 the original author or authors. * Copyright 2002-2021 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -36,6 +36,7 @@ import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.MultiValueMap; import org.springframework.util.MultiValueMap;
/** /**
@ -49,6 +50,10 @@ import org.springframework.util.MultiValueMap;
*/ */
public class JettyHttpHandlerAdapter extends ServletHttpHandlerAdapter { public class JettyHttpHandlerAdapter extends ServletHttpHandlerAdapter {
private static final boolean jetty10Present = ClassUtils.isPresent(
"org.eclipse.jetty.http.CookieCutter", JettyHttpHandlerAdapter.class.getClassLoader());
public JettyHttpHandlerAdapter(HttpHandler httpHandler) { public JettyHttpHandlerAdapter(HttpHandler httpHandler) {
super(httpHandler); super(httpHandler);
} }
@ -58,16 +63,29 @@ public class JettyHttpHandlerAdapter extends ServletHttpHandlerAdapter {
protected ServletServerHttpRequest createRequest(HttpServletRequest request, AsyncContext context) protected ServletServerHttpRequest createRequest(HttpServletRequest request, AsyncContext context)
throws IOException, URISyntaxException { throws IOException, URISyntaxException {
// TODO: need to compile against Jetty 10 to use HttpFields (class->interface)
if (jetty10Present) {
return super.createRequest(request, context);
}
Assert.notNull(getServletPath(), "Servlet path is not initialized"); Assert.notNull(getServletPath(), "Servlet path is not initialized");
return new JettyServerHttpRequest(request, context, getServletPath(), getDataBufferFactory(), getBufferSize()); return new JettyServerHttpRequest(
request, context, getServletPath(), getDataBufferFactory(), getBufferSize());
} }
@Override @Override
protected ServletServerHttpResponse createResponse(HttpServletResponse response, protected ServletServerHttpResponse createResponse(HttpServletResponse response,
AsyncContext context, ServletServerHttpRequest request) throws IOException { AsyncContext context, ServletServerHttpRequest request) throws IOException {
return new JettyServerHttpResponse( // TODO: need to compile against Jetty 10 to use HttpFields (class->interface)
response, context, getDataBufferFactory(), getBufferSize(), request); if (jetty10Present) {
return new BaseJettyServerHttpResponse(
response, context, getDataBufferFactory(), getBufferSize(), request);
}
else {
return new JettyServerHttpResponse(
response, context, getDataBufferFactory(), getBufferSize(), request);
}
} }
@ -87,7 +105,34 @@ public class JettyHttpHandlerAdapter extends ServletHttpHandlerAdapter {
} }
private static final class JettyServerHttpResponse extends ServletServerHttpResponse { private static class BaseJettyServerHttpResponse extends ServletServerHttpResponse {
BaseJettyServerHttpResponse(HttpServletResponse response, AsyncContext asyncContext,
DataBufferFactory bufferFactory, int bufferSize, ServletServerHttpRequest request)
throws IOException {
super(response, asyncContext, bufferFactory, bufferSize, request);
}
BaseJettyServerHttpResponse(HttpHeaders headers, HttpServletResponse response, AsyncContext asyncContext,
DataBufferFactory bufferFactory, int bufferSize, ServletServerHttpRequest request)
throws IOException {
super(headers, response, asyncContext, bufferFactory, bufferSize, request);
}
@Override
protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException {
ByteBuffer input = dataBuffer.asByteBuffer();
int len = input.remaining();
ServletResponse response = getNativeResponse();
((HttpOutput) response.getOutputStream()).write(input);
return len;
}
}
private static final class JettyServerHttpResponse extends BaseJettyServerHttpResponse {
JettyServerHttpResponse(HttpServletResponse response, AsyncContext asyncContext, JettyServerHttpResponse(HttpServletResponse response, AsyncContext asyncContext,
DataBufferFactory bufferFactory, int bufferSize, ServletServerHttpRequest request) DataBufferFactory bufferFactory, int bufferSize, ServletServerHttpRequest request)
@ -124,15 +169,6 @@ public class JettyHttpHandlerAdapter extends ServletHttpHandlerAdapter {
response.setContentLengthLong(contentLength); response.setContentLengthLong(contentLength);
} }
} }
@Override
protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException {
ByteBuffer input = dataBuffer.asByteBuffer();
int len = input.remaining();
ServletResponse response = getNativeResponse();
((HttpOutput) response.getOutputStream()).write(input);
return len;
}
} }
} }

View File

@ -0,0 +1,147 @@
/*
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.adapter;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.function.Function;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketMessage.Type;
import org.springframework.web.reactive.socket.WebSocketSession;
/**
* Identical to {@link JettyWebSocketHandlerAdapter}, only excluding the
* {@code onWebSocketFrame} method, since the {@link Frame} argument has moved
* to a different package in Jetty 10.
*
* @author Rossen Stoyanchev
* @since 5.3.4
*/
@WebSocket
public class Jetty10WebSocketHandlerAdapter {
private static final ByteBuffer EMPTY_PAYLOAD = ByteBuffer.wrap(new byte[0]);
private final WebSocketHandler delegateHandler;
private final Function<Session, JettyWebSocketSession> sessionFactory;
@Nullable
private JettyWebSocketSession delegateSession;
public Jetty10WebSocketHandlerAdapter(WebSocketHandler handler,
Function<Session, JettyWebSocketSession> sessionFactory) {
Assert.notNull(handler, "WebSocketHandler is required");
Assert.notNull(sessionFactory, "'sessionFactory' is required");
this.delegateHandler = handler;
this.sessionFactory = sessionFactory;
}
@OnWebSocketConnect
public void onWebSocketConnect(Session session) {
this.delegateSession = this.sessionFactory.apply(session);
this.delegateHandler.handle(this.delegateSession)
.checkpoint(session.getUpgradeRequest().getRequestURI() + " [JettyWebSocketHandlerAdapter]")
.subscribe(this.delegateSession);
}
@OnWebSocketMessage
public void onWebSocketText(String message) {
if (this.delegateSession != null) {
WebSocketMessage webSocketMessage = toMessage(Type.TEXT, message);
this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage);
}
}
@OnWebSocketMessage
public void onWebSocketBinary(byte[] message, int offset, int length) {
if (this.delegateSession != null) {
ByteBuffer buffer = ByteBuffer.wrap(message, offset, length);
WebSocketMessage webSocketMessage = toMessage(Type.BINARY, buffer);
this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage);
}
}
// TODO: onWebSocketFrame can't be declared without compiling against Jetty 10
// Jetty 10: org.eclipse.jetty.websocket.api.Frame
// Jetty 9: org.eclipse.jetty.websocket.api.extensions.Frame
// @OnWebSocketFrame
// public void onWebSocketFrame(Frame frame) {
// if (this.delegateSession != null) {
// if (OpCode.PONG == frame.getOpCode()) {
// ByteBuffer buffer = (frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD);
// WebSocketMessage webSocketMessage = toMessage(Type.PONG, buffer);
// this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage);
// }
// }
// }
private <T> WebSocketMessage toMessage(Type type, T message) {
WebSocketSession session = this.delegateSession;
Assert.state(session != null, "Cannot create message without a session");
if (Type.TEXT.equals(type)) {
byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = session.bufferFactory().wrap(bytes);
return new WebSocketMessage(Type.TEXT, buffer);
}
else if (Type.BINARY.equals(type)) {
DataBuffer buffer = session.bufferFactory().wrap((ByteBuffer) message);
return new WebSocketMessage(Type.BINARY, buffer);
}
else if (Type.PONG.equals(type)) {
DataBuffer buffer = session.bufferFactory().wrap((ByteBuffer) message);
return new WebSocketMessage(Type.PONG, buffer);
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message);
}
}
@OnWebSocketClose
public void onWebSocketClose(int statusCode, String reason) {
if (this.delegateSession != null) {
this.delegateSession.handleClose(CloseStatus.create(statusCode, reason));
}
}
@OnWebSocketError
public void onWebSocketError(Throwable cause) {
if (this.delegateSession != null) {
this.delegateSession.handleError(cause);
}
}
}

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2020 the original author or authors. * Copyright 2002-2021 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -17,7 +17,9 @@
package org.springframework.web.reactive.socket.client; package org.springframework.web.reactive.socket.client;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Method;
import java.net.URI; import java.net.URI;
import java.util.function.Function;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -32,9 +34,12 @@ import reactor.core.publisher.Sinks;
import org.springframework.context.Lifecycle; import org.springframework.context.Lifecycle;
import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.reactive.socket.HandshakeInfo; import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.ContextWebSocketHandler; import org.springframework.web.reactive.socket.adapter.ContextWebSocketHandler;
import org.springframework.web.reactive.socket.adapter.Jetty10WebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.adapter.JettyWebSocketHandlerAdapter; import org.springframework.web.reactive.socket.adapter.JettyWebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.adapter.JettyWebSocketSession; import org.springframework.web.reactive.socket.adapter.JettyWebSocketSession;
@ -53,6 +58,16 @@ import org.springframework.web.reactive.socket.adapter.JettyWebSocketSession;
*/ */
public class JettyWebSocketClient implements WebSocketClient, Lifecycle { public class JettyWebSocketClient implements WebSocketClient, Lifecycle {
private static ClassLoader loader = JettyWebSocketClient.class.getClassLoader();
private static final boolean jetty10Present;
static {
jetty10Present = ClassUtils.isPresent(
"org.eclipse.jetty.websocket.client.JettyUpgradeListener", loader);
}
private static final Log logger = LogFactory.getLog(JettyWebSocketClient.class); private static final Log logger = LogFactory.getLog(JettyWebSocketClient.class);
@ -60,6 +75,9 @@ public class JettyWebSocketClient implements WebSocketClient, Lifecycle {
private final boolean externallyManaged; private final boolean externallyManaged;
private final UpgradeHelper upgradeHelper =
(jetty10Present ? new Jetty10UpgradeHelper() : new Jetty9UpgradeHelper());
/** /**
* Default constructor that creates and manages an instance of a Jetty * Default constructor that creates and manages an instance of a Jetty
@ -147,22 +165,19 @@ public class JettyWebSocketClient implements WebSocketClient, Lifecycle {
url, ContextWebSocketHandler.decorate(handler, contextView), completionSink); url, ContextWebSocketHandler.decorate(handler, contextView), completionSink);
ClientUpgradeRequest request = new ClientUpgradeRequest(); ClientUpgradeRequest request = new ClientUpgradeRequest();
request.setSubProtocols(handler.getSubProtocols()); request.setSubProtocols(handler.getSubProtocols());
UpgradeListener upgradeListener = new DefaultUpgradeListener(headers); return this.upgradeHelper.upgrade(
try { this.jettyClient, jettyHandler, url, request, headers, completionSink);
this.jettyClient.connect(jettyHandler, url, request, upgradeListener);
return completionSink.asMono();
}
catch (IOException ex) {
return Mono.error(ex);
}
}); });
} }
private Object createHandler(URI url, WebSocketHandler handler, Sinks.Empty<Void> completion) { private Object createHandler(URI url, WebSocketHandler handler, Sinks.Empty<Void> completion) {
return new JettyWebSocketHandlerAdapter(handler, session -> { Function<Session, JettyWebSocketSession> sessionFactory = session -> {
HandshakeInfo info = createHandshakeInfo(url, session); HandshakeInfo info = createHandshakeInfo(url, session);
return new JettyWebSocketSession(session, info, DefaultDataBufferFactory.sharedInstance, completion); return new JettyWebSocketSession(session, info, DefaultDataBufferFactory.sharedInstance, completion);
}); };
return (jetty10Present ?
new Jetty10WebSocketHandlerAdapter(handler, sessionFactory) :
new JettyWebSocketHandlerAdapter(handler, sessionFactory));
} }
private HandshakeInfo createHandshakeInfo(URI url, Session jettySession) { private HandshakeInfo createHandshakeInfo(URI url, Session jettySession) {
@ -173,6 +188,34 @@ public class JettyWebSocketClient implements WebSocketClient, Lifecycle {
} }
/**
* Encapsulate incompatible changes between Jetty 9.4 and 10.
*/
private interface UpgradeHelper {
Mono<Void> upgrade(org.eclipse.jetty.websocket.client.WebSocketClient jettyClient,
Object jettyHandler, URI url, ClientUpgradeRequest request, HttpHeaders headers,
Sinks.Empty<Void> completionSink);
}
private static class Jetty9UpgradeHelper implements UpgradeHelper {
@Override
public Mono<Void> upgrade(org.eclipse.jetty.websocket.client.WebSocketClient jettyClient,
Object jettyHandler, URI url, ClientUpgradeRequest request, HttpHeaders headers,
Sinks.Empty<Void> completionSink) {
try {
jettyClient.connect(jettyHandler, url, request, new DefaultUpgradeListener(headers));
return completionSink.asMono();
}
catch (IOException ex) {
return Mono.error(ex);
}
}
}
private static class DefaultUpgradeListener implements UpgradeListener { private static class DefaultUpgradeListener implements UpgradeListener {
private final HttpHeaders headers; private final HttpHeaders headers;
@ -192,4 +235,32 @@ public class JettyWebSocketClient implements WebSocketClient, Lifecycle {
} }
} }
private static class Jetty10UpgradeHelper implements UpgradeHelper {
// On Jetty 9 returns Future, on Jetty 10 returns CompletableFuture
private static final Method connectMethod;
static {
try {
Class<?> type = loader.loadClass("org.eclipse.jetty.websocket.client.WebSocketClient");
connectMethod = type.getMethod("connect", Object.class, URI.class, ClientUpgradeRequest.class);
}
catch (ClassNotFoundException | NoSuchMethodException ex) {
throw new IllegalStateException("No compatible Jetty version found", ex);
}
}
@Override
public Mono<Void> upgrade(org.eclipse.jetty.websocket.client.WebSocketClient jettyClient,
Object jettyHandler, URI url, ClientUpgradeRequest request, HttpHeaders headers,
Sinks.Empty<Void> completionSink) {
// TODO: pass JettyUpgradeListener argument to set headers from HttpHeaders (like we do for Jetty 9)
// which would require a JDK Proxy since it is new in Jetty 10
ReflectionUtils.invokeMethod(connectMethod, jettyClient, jettyHandler, url, request);
return completionSink.asMono();
}
}
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2020 the original author or authors. * Copyright 2002-2021 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -68,16 +68,19 @@ public class HandshakeWebSocketService implements WebSocketService, Lifecycle {
private static final boolean jettyPresent; private static final boolean jettyPresent;
private static final boolean jetty10Present;
private static final boolean undertowPresent; private static final boolean undertowPresent;
private static final boolean reactorNettyPresent; private static final boolean reactorNettyPresent;
static { static {
ClassLoader classLoader = HandshakeWebSocketService.class.getClassLoader(); ClassLoader loader = HandshakeWebSocketService.class.getClassLoader();
tomcatPresent = ClassUtils.isPresent("org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", classLoader); tomcatPresent = ClassUtils.isPresent("org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", loader);
jettyPresent = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.WebSocketServerFactory", classLoader); jettyPresent = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.WebSocketServerFactory", loader);
undertowPresent = ClassUtils.isPresent("io.undertow.websockets.WebSocketProtocolHandshakeHandler", classLoader); jetty10Present = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", loader);
reactorNettyPresent = ClassUtils.isPresent("reactor.netty.http.server.HttpServerResponse", classLoader); undertowPresent = ClassUtils.isPresent("io.undertow.websockets.WebSocketProtocolHandshakeHandler", loader);
reactorNettyPresent = ClassUtils.isPresent("reactor.netty.http.server.HttpServerResponse", loader);
} }
@ -117,6 +120,9 @@ public class HandshakeWebSocketService implements WebSocketService, Lifecycle {
else if (jettyPresent) { else if (jettyPresent) {
className = "JettyRequestUpgradeStrategy"; className = "JettyRequestUpgradeStrategy";
} }
else if (jetty10Present) {
className = "Jetty10RequestUpgradeStrategy";
}
else if (undertowPresent) { else if (undertowPresent) {
className = "UndertowRequestUpgradeStrategy"; className = "UndertowRequestUpgradeStrategy";
} }

View File

@ -0,0 +1,154 @@
/*
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.server.upgrade;
import java.lang.reflect.Method;
import java.util.function.Supplier;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import reactor.core.publisher.Mono;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.target.EmptyTargetSource;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.ContextWebSocketHandler;
import org.springframework.web.reactive.socket.adapter.Jetty10WebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.adapter.JettyWebSocketSession;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange;
/**
* A {@link RequestUpgradeStrategy} for use with Jetty 10.
*
* @author Rossen Stoyanchev
* @since 5.3.4
*/
public class Jetty10RequestUpgradeStrategy implements RequestUpgradeStrategy {
private static final Class<?> webSocketCreatorClass;
private static final Method getContainerMethod;
private static final Method upgradeMethod;
private static final Method setAcceptedSubProtocol;
static {
ClassLoader loader = Jetty10RequestUpgradeStrategy.class.getClassLoader();
try {
webSocketCreatorClass = loader.loadClass("org.eclipse.jetty.websocket.server.JettyWebSocketCreator");
Class<?> type = loader.loadClass("org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer");
getContainerMethod = type.getMethod("getContainer", ServletContext.class);
upgradeMethod = ReflectionUtils.findMethod(type, "upgrade", (Class<?>[]) null);
type = loader.loadClass("org.eclipse.jetty.websocket.server.JettyServerUpgradeResponse");
setAcceptedSubProtocol = type.getMethod("setAcceptedSubProtocol", String.class);
}
catch (Exception ex) {
throw new IllegalStateException("No compatible Jetty version found", ex);
}
}
@Override
public Mono<Void> upgrade(
ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
HttpServletRequest servletRequest = ServerHttpRequestDecorator.getNativeRequest(request);
HttpServletResponse servletResponse = ServerHttpResponseDecorator.getNativeResponse(response);
ServletContext servletContext = servletRequest.getServletContext();
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
DataBufferFactory factory = response.bufferFactory();
// Trigger WebFlux preCommit actions and upgrade
return exchange.getResponse().setComplete()
.then(Mono.deferContextual(contextView -> {
Jetty10WebSocketHandlerAdapter adapter = new Jetty10WebSocketHandlerAdapter(
ContextWebSocketHandler.decorate(handler, contextView),
session -> new JettyWebSocketSession(session, handshakeInfo, factory));
try {
Object creator = createJettyWebSocketCreator(adapter, subProtocol);
Object container = ReflectionUtils.invokeMethod(getContainerMethod, null, servletContext);
ReflectionUtils.invokeMethod(upgradeMethod, container, creator, servletRequest, servletResponse);
}
catch (Exception ex) {
return Mono.error(ex);
}
return Mono.empty();
}));
}
private static Object createJettyWebSocketCreator(
Jetty10WebSocketHandlerAdapter adapter, @Nullable String protocol) {
ProxyFactory factory = new ProxyFactory(EmptyTargetSource.INSTANCE);
factory.addInterface(webSocketCreatorClass);
factory.addAdvice(new WebSocketCreatorInterceptor(adapter, protocol));
return factory.getProxy();
}
/**
* Proxy for a JettyWebSocketCreator to supply the WebSocket handler and set the sub-protocol.
*/
private static class WebSocketCreatorInterceptor implements MethodInterceptor {
private final Jetty10WebSocketHandlerAdapter adapter;
@Nullable
private final String protocol;
public WebSocketCreatorInterceptor(
Jetty10WebSocketHandlerAdapter adapter, @Nullable String protocol) {
this.adapter = adapter;
this.protocol = protocol;
}
@Nullable
@Override
public Object invoke(@NonNull MethodInvocation invocation) {
if (this.protocol != null) {
ReflectionUtils.invokeMethod(
setAcceptedSubProtocol, invocation.getArguments()[2], this.protocol);
}
return this.adapter;
}
}
}

View File

@ -0,0 +1,137 @@
/*
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.socket.adapter.jetty;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.springframework.util.Assert;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.handler.ExceptionWebSocketHandlerDecorator;
/**
* Identical to {@link JettyWebSocketHandlerAdapter}, only excluding the
* {@code onWebSocketFrame} method, since the {@link Frame} argument has moved
* to a different package in Jetty 10.
*
* @author Rossen Stoyanchev
* @since 5.3.4
*/
@WebSocket
public class Jetty10WebSocketHandlerAdapter {
private static final Log logger = LogFactory.getLog(Jetty10WebSocketHandlerAdapter.class);
private final WebSocketHandler webSocketHandler;
private final JettyWebSocketSession wsSession;
public Jetty10WebSocketHandlerAdapter(WebSocketHandler webSocketHandler, JettyWebSocketSession wsSession) {
Assert.notNull(webSocketHandler, "WebSocketHandler must not be null");
Assert.notNull(wsSession, "WebSocketSession must not be null");
this.webSocketHandler = webSocketHandler;
this.wsSession = wsSession;
}
@OnWebSocketConnect
public void onWebSocketConnect(Session session) {
try {
this.wsSession.initializeNativeSession(session);
this.webSocketHandler.afterConnectionEstablished(this.wsSession);
}
catch (Exception ex) {
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger);
}
}
@OnWebSocketMessage
public void onWebSocketText(String payload) {
TextMessage message = new TextMessage(payload);
try {
this.webSocketHandler.handleMessage(this.wsSession, message);
}
catch (Exception ex) {
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger);
}
}
@OnWebSocketMessage
public void onWebSocketBinary(byte[] payload, int offset, int length) {
BinaryMessage message = new BinaryMessage(payload, offset, length, true);
try {
this.webSocketHandler.handleMessage(this.wsSession, message);
}
catch (Exception ex) {
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger);
}
}
// TODO: onWebSocketFrame can't be declared without compiling against Jetty 10
// Jetty 10: org.eclipse.jetty.websocket.api.Frame
// Jetty 9: org.eclipse.jetty.websocket.api.extensions.Frame
// @OnWebSocketFrame
// public void onWebSocketFrame(Frame frame) {
// if (OpCode.PONG == frame.getOpCode()) {
// ByteBuffer payload = frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD;
// PongMessage message = new PongMessage(payload);
// try {
// this.webSocketHandler.handleMessage(this.wsSession, message);
// }
// catch (Exception ex) {
// ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger);
// }
// }
// }
@OnWebSocketClose
public void onWebSocketClose(int statusCode, String reason) {
CloseStatus closeStatus = new CloseStatus(statusCode, reason);
try {
this.webSocketHandler.afterConnectionClosed(this.wsSession, closeStatus);
}
catch (Exception ex) {
if (logger.isWarnEnabled()) {
logger.warn("Unhandled exception after connection closed for " + this, ex);
}
}
}
@OnWebSocketError
public void onWebSocketError(Throwable cause) {
try {
this.webSocketHandler.handleTransportError(this.wsSession, cause);
}
catch (Exception ex) {
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger);
}
}
}

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2018 the original author or authors. * Copyright 2002-2021 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -17,6 +17,7 @@
package org.springframework.web.socket.adapter.jetty; package org.springframework.web.socket.adapter.jetty;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.security.Principal; import java.security.Principal;
@ -27,13 +28,14 @@ import java.util.Map;
import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.socket.BinaryMessage; import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.PingMessage; import org.springframework.web.socket.PingMessage;
@ -54,6 +56,12 @@ import org.springframework.web.socket.adapter.AbstractWebSocketSession;
*/ */
public class JettyWebSocketSession extends AbstractWebSocketSession<Session> { public class JettyWebSocketSession extends AbstractWebSocketSession<Session> {
private static final ClassLoader loader = JettyWebSocketSession.class.getClassLoader();
private static final boolean jetty10Present = ClassUtils.isPresent(
"org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", loader);
private final String id; private final String id;
@Nullable @Nullable
@ -71,6 +79,8 @@ public class JettyWebSocketSession extends AbstractWebSocketSession<Session> {
@Nullable @Nullable
private Principal user; private Principal user;
private final SessionHelper sessionHelper;
/** /**
* Create a new {@link JettyWebSocketSession} instance. * Create a new {@link JettyWebSocketSession} instance.
@ -91,6 +101,7 @@ public class JettyWebSocketSession extends AbstractWebSocketSession<Session> {
super(attributes); super(attributes);
this.id = idGenerator.generateId().toString(); this.id = idGenerator.generateId().toString();
this.user = user; this.user = user;
this.sessionHelper = (jetty10Present ? new Jetty10SessionHelper() : new Jetty9SessionHelper());
} }
@ -141,28 +152,32 @@ public class JettyWebSocketSession extends AbstractWebSocketSession<Session> {
return getNativeSession().getRemoteAddress(); return getNativeSession().getRemoteAddress();
} }
/**
* This method is a no-op for Jetty. As per {@link Session#getPolicy()}, the
* returned {@code WebSocketPolicy} is read-only and changing it has no effect.
*/
@Override @Override
public void setTextMessageSizeLimit(int messageSizeLimit) { public void setTextMessageSizeLimit(int messageSizeLimit) {
checkNativeSessionInitialized();
getNativeSession().getPolicy().setMaxTextMessageSize(messageSizeLimit);
} }
@Override @Override
public int getTextMessageSizeLimit() { public int getTextMessageSizeLimit() {
checkNativeSessionInitialized(); checkNativeSessionInitialized();
return getNativeSession().getPolicy().getMaxTextMessageSize(); return this.sessionHelper.getTextMessageSizeLimit(getNativeSession());
} }
/**
* This method is a no-op for Jetty. As per {@link Session#getPolicy()}, the
* returned {@code WebSocketPolicy} is read-only and changing it has no effect.
*/
@Override @Override
public void setBinaryMessageSizeLimit(int messageSizeLimit) { public void setBinaryMessageSizeLimit(int messageSizeLimit) {
checkNativeSessionInitialized();
getNativeSession().getPolicy().setMaxBinaryMessageSize(messageSizeLimit);
} }
@Override @Override
public int getBinaryMessageSizeLimit() { public int getBinaryMessageSizeLimit() {
checkNativeSessionInitialized(); checkNativeSessionInitialized();
return getNativeSession().getPolicy().getMaxBinaryMessageSize(); return this.sessionHelper.getBinaryMessageSizeLimit(getNativeSession());
} }
@Override @Override
@ -178,22 +193,14 @@ public class JettyWebSocketSession extends AbstractWebSocketSession<Session> {
this.uri = session.getUpgradeRequest().getRequestURI(); this.uri = session.getUpgradeRequest().getRequestURI();
HttpHeaders headers = new HttpHeaders(); HttpHeaders headers = new HttpHeaders();
headers.putAll(session.getUpgradeRequest().getHeaders()); Map<String, List<String>> nativeHeaders = session.getUpgradeRequest().getHeaders();
if (!CollectionUtils.isEmpty(nativeHeaders)) {
headers.putAll(nativeHeaders);
}
this.headers = HttpHeaders.readOnlyHttpHeaders(headers); this.headers = HttpHeaders.readOnlyHttpHeaders(headers);
this.acceptedProtocol = session.getUpgradeResponse().getAcceptedSubProtocol(); this.acceptedProtocol = session.getUpgradeResponse().getAcceptedSubProtocol();
this.extensions = this.sessionHelper.getExtensions(session);
List<ExtensionConfig> jettyExtensions = session.getUpgradeResponse().getExtensions();
if (!CollectionUtils.isEmpty(jettyExtensions)) {
List<WebSocketExtension> extensions = new ArrayList<>(jettyExtensions.size());
for (ExtensionConfig jettyExtension : jettyExtensions) {
extensions.add(new WebSocketExtension(jettyExtension.getName(), jettyExtension.getParameters()));
}
this.extensions = Collections.unmodifiableList(extensions);
}
else {
this.extensions = Collections.emptyList();
}
if (this.user == null) { if (this.user == null) {
this.user = session.getUpgradeRequest().getUserPrincipal(); this.user = session.getUpgradeRequest().getUserPrincipal();
@ -221,13 +228,8 @@ public class JettyWebSocketSession extends AbstractWebSocketSession<Session> {
getRemoteEndpoint().sendPong(message.getPayload()); getRemoteEndpoint().sendPong(message.getPayload());
} }
private RemoteEndpoint getRemoteEndpoint() throws IOException { private RemoteEndpoint getRemoteEndpoint() {
try { return getNativeSession().getRemote();
return getNativeSession().getRemote();
}
catch (WebSocketException ex) {
throw new IOException("Unable to obtain RemoteEndpoint in session " + getId(), ex);
}
} }
@Override @Override
@ -235,4 +237,90 @@ public class JettyWebSocketSession extends AbstractWebSocketSession<Session> {
getNativeSession().close(status.getCode(), status.getReason()); getNativeSession().close(status.getCode(), status.getReason());
} }
/**
* Encapsulate incompatible changes between Jetty 9.4 and 10.
*/
private interface SessionHelper {
List<WebSocketExtension> getExtensions(Session session);
int getTextMessageSizeLimit(Session session);
int getBinaryMessageSizeLimit(Session session);
}
private static class Jetty9SessionHelper implements SessionHelper {
@Override
public List<WebSocketExtension> getExtensions(Session session) {
List<ExtensionConfig> configs = session.getUpgradeResponse().getExtensions();
if (!CollectionUtils.isEmpty(configs)) {
List<WebSocketExtension> result = new ArrayList<>(configs.size());
for (ExtensionConfig config : configs) {
result.add(new WebSocketExtension(config.getName(), config.getParameters()));
}
return Collections.unmodifiableList(result);
}
return Collections.emptyList();
}
@Override
public int getTextMessageSizeLimit(Session session) {
return session.getPolicy().getMaxTextMessageSize();
}
@Override
public int getBinaryMessageSizeLimit(Session session) {
return session.getPolicy().getMaxBinaryMessageSize();
}
}
private static class Jetty10SessionHelper implements SessionHelper {
private static final Method getTextMessageSizeLimitMethod;
private static final Method getBinaryMessageSizeLimitMethod;
static {
try {
Class<?> type = loader.loadClass("org.eclipse.jetty.websocket.api.WebSocketPolicy");
getTextMessageSizeLimitMethod = type.getMethod("getMaxTextMessageSize");
getBinaryMessageSizeLimitMethod = type.getMethod("getMaxBinaryMessageSize");
}
catch (ClassNotFoundException | NoSuchMethodException ex) {
throw new IllegalStateException("No compatible Jetty version found", ex);
}
}
// TODO: Extension info can't be accessed without compiling against Jetty 10
// Jetty 10: org.eclipse.jetty.websocket.api.ExtensionConfig
// Jetty 9: org.eclipse.jetty.websocket.api.extensions.ExtensionConfig
@Override
public List<WebSocketExtension> getExtensions(Session session) {
return Collections.emptyList();
}
// TODO: WebSocketPolicy can't be accessed without compiling against Jetty 10 (class -> interface)
@Override
@SuppressWarnings("ConstantConditions")
public int getTextMessageSizeLimit(Session session) {
long result = (long) ReflectionUtils.invokeMethod(getTextMessageSizeLimitMethod, session.getPolicy());
Assert.state(result <= Integer.MAX_VALUE, "textMessageSizeLimit is larger than Integer.MAX_VALUE");
return (int) result;
}
@Override
@SuppressWarnings("ConstantConditions")
public int getBinaryMessageSizeLimit(Session session) {
long result = (long) ReflectionUtils.invokeMethod(getBinaryMessageSizeLimitMethod, session.getPolicy());
Assert.state(result <= Integer.MAX_VALUE, "binaryMessageSizeLimit is larger than Integer.MAX_VALUE");
return (int) result;
}
}
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2019 the original author or authors. * Copyright 2002-2021 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -16,6 +16,8 @@
package org.springframework.web.socket.client.jetty; package org.springframework.web.socket.client.jetty;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.URI; import java.net.URI;
import java.security.Principal; import java.security.Principal;
import java.util.List; import java.util.List;
@ -34,11 +36,14 @@ import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor; import org.springframework.core.task.TaskExecutor;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureTask; import org.springframework.util.concurrent.ListenableFutureTask;
import org.springframework.web.socket.WebSocketExtension; import org.springframework.web.socket.WebSocketExtension;
import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.adapter.jetty.Jetty10WebSocketHandlerAdapter;
import org.springframework.web.socket.adapter.jetty.JettyWebSocketHandlerAdapter; import org.springframework.web.socket.adapter.jetty.JettyWebSocketHandlerAdapter;
import org.springframework.web.socket.adapter.jetty.JettyWebSocketSession; import org.springframework.web.socket.adapter.jetty.JettyWebSocketSession;
import org.springframework.web.socket.adapter.jetty.WebSocketToJettyExtensionConfigAdapter; import org.springframework.web.socket.adapter.jetty.WebSocketToJettyExtensionConfigAdapter;
@ -60,11 +65,32 @@ import org.springframework.web.util.UriComponentsBuilder;
*/ */
public class JettyWebSocketClient extends AbstractWebSocketClient implements Lifecycle { public class JettyWebSocketClient extends AbstractWebSocketClient implements Lifecycle {
private static ClassLoader loader = JettyWebSocketClient.class.getClassLoader();
private static final boolean jetty10Present;
private static final Method setHeadersMethod;
static {
jetty10Present = ClassUtils.isPresent(
"org.eclipse.jetty.websocket.client.JettyUpgradeListener", loader);
try {
setHeadersMethod = ClientUpgradeRequest.class.getMethod("setHeaders", Map.class);
}
catch (NoSuchMethodException ex) {
throw new IllegalStateException("No compatible Jetty version found", ex);
}
}
private final org.eclipse.jetty.websocket.client.WebSocketClient client; private final org.eclipse.jetty.websocket.client.WebSocketClient client;
@Nullable @Nullable
private AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); private AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
private final UpgradeHelper upgradeHelper =
(jetty10Present ? new Jetty10UpgradeHelper() : new Jetty9UpgradeHelper());
/** /**
* Default constructor that creates an instance of * Default constructor that creates an instance of
@ -148,14 +174,15 @@ public class JettyWebSocketClient extends AbstractWebSocketClient implements Lif
request.addExtensions(new WebSocketToJettyExtensionConfigAdapter(e)); request.addExtensions(new WebSocketToJettyExtensionConfigAdapter(e));
} }
headers.forEach(request::setHeader); // Jetty 9: setHeaders declared in UpgradeRequestAdapter base class
// Jetty 10: setHeaders declared in ClientUpgradeRequest
ReflectionUtils.invokeMethod(setHeadersMethod, request, headers);
Principal user = getUser(); Principal user = getUser();
final JettyWebSocketSession wsSession = new JettyWebSocketSession(attributes, user); JettyWebSocketSession wsSession = new JettyWebSocketSession(attributes, user);
final JettyWebSocketHandlerAdapter listener = new JettyWebSocketHandlerAdapter(wsHandler, wsSession);
Callable<WebSocketSession> connectTask = () -> { Callable<WebSocketSession> connectTask = () -> {
Future<Session> future = this.client.connect(listener, uri, request); Future<Session> future = this.upgradeHelper.connect(this.client, uri, request, wsHandler, wsSession);
future.get(this.client.getConnectTimeout() + 2000, TimeUnit.MILLISECONDS); future.get(this.client.getConnectTimeout() + 2000, TimeUnit.MILLISECONDS);
return wsSession; return wsSession;
}; };
@ -179,4 +206,55 @@ public class JettyWebSocketClient extends AbstractWebSocketClient implements Lif
return null; return null;
} }
/**
* Encapsulate incompatible changes between Jetty 9.4 and 10.
*/
private interface UpgradeHelper {
Future<Session> connect(WebSocketClient client, URI url, ClientUpgradeRequest request,
WebSocketHandler handler, JettyWebSocketSession session) throws IOException;
}
private static class Jetty9UpgradeHelper implements UpgradeHelper {
@Override
public Future<Session> connect(WebSocketClient client, URI url, ClientUpgradeRequest request,
WebSocketHandler handler, JettyWebSocketSession session) throws IOException {
JettyWebSocketHandlerAdapter adapter = new JettyWebSocketHandlerAdapter(handler, session);
return client.connect(adapter, url, request);
}
}
private static class Jetty10UpgradeHelper implements UpgradeHelper {
// On Jetty 9 returns Future, on Jetty 10 returns CompletableFuture
private static final Method connectMethod;
static {
try {
Class<?> type = loader.loadClass("org.eclipse.jetty.websocket.client.WebSocketClient");
connectMethod = type.getMethod("connect", Object.class, URI.class, ClientUpgradeRequest.class);
}
catch (ClassNotFoundException | NoSuchMethodException ex) {
throw new IllegalStateException("No compatible Jetty version found", ex);
}
}
@Override
@SuppressWarnings({"ConstantConditions", "unchecked"})
public Future<Session> connect(WebSocketClient client, URI url, ClientUpgradeRequest request,
WebSocketHandler handler, JettyWebSocketSession session) {
Jetty10WebSocketHandlerAdapter adapter = new Jetty10WebSocketHandlerAdapter(handler, session);
// TODO: pass JettyUpgradeListener argument to set headers from HttpHeaders (like we do for Jetty 9)
// which would require a JDK Proxy since it is new in Jetty 10
return (Future<Session>) ReflectionUtils.invokeMethod(connectMethod, client, adapter, url, request);
}
}
} }

View File

@ -0,0 +1,165 @@
/*
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.socket.server.jetty;
import java.lang.reflect.Method;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.Principal;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.eclipse.jetty.websocket.server.HandshakeRFC6455;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.target.EmptyTargetSource;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.http.server.ServletServerHttpResponse;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.socket.WebSocketExtension;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.adapter.jetty.Jetty10WebSocketHandlerAdapter;
import org.springframework.web.socket.adapter.jetty.JettyWebSocketSession;
import org.springframework.web.socket.server.HandshakeFailureException;
import org.springframework.web.socket.server.RequestUpgradeStrategy;
/**
* A {@link RequestUpgradeStrategy} for Jetty 10.
*
* @author Rossen Stoyanchev
* @since 5.3.4
*/
public class Jetty10RequestUpgradeStrategy implements RequestUpgradeStrategy {
private static final String[] SUPPORTED_VERSIONS = new String[] { String.valueOf(HandshakeRFC6455.VERSION) };
private static final Class<?> webSocketCreatorClass;
private static final Method getContainerMethod;
private static final Method upgradeMethod;
private static final Method setAcceptedSubProtocol;
static {
ClassLoader loader = Jetty10RequestUpgradeStrategy.class.getClassLoader();
try {
webSocketCreatorClass = loader.loadClass("org.eclipse.jetty.websocket.server.JettyWebSocketCreator");
Class<?> type = loader.loadClass("org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer");
getContainerMethod = type.getMethod("getContainer", ServletContext.class);
upgradeMethod = ReflectionUtils.findMethod(type, "upgrade", (Class<?>[]) null);
type = loader.loadClass("org.eclipse.jetty.websocket.server.JettyServerUpgradeResponse");
setAcceptedSubProtocol = type.getMethod("setAcceptedSubProtocol", String.class);
}
catch (Exception ex) {
throw new IllegalStateException("No compatible Jetty version found", ex);
}
}
@Override
public String[] getSupportedVersions() {
return SUPPORTED_VERSIONS;
}
@Override
public List<WebSocketExtension> getSupportedExtensions(ServerHttpRequest request) {
return Collections.emptyList();
}
@Override
public void upgrade(ServerHttpRequest request, ServerHttpResponse response,
@Nullable String selectedProtocol, List<WebSocketExtension> selectedExtensions,
@Nullable Principal user, WebSocketHandler handler, Map<String, Object> attributes)
throws HandshakeFailureException {
Assert.isInstanceOf(ServletServerHttpRequest.class, request, "ServletServerHttpRequest required");
HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
ServletContext servletContext = servletRequest.getServletContext();
Assert.isInstanceOf(ServletServerHttpResponse.class, response, "ServletServerHttpResponse required");
HttpServletResponse servletResponse = ((ServletServerHttpResponse) response).getServletResponse();
JettyWebSocketSession session = new JettyWebSocketSession(attributes, user);
Jetty10WebSocketHandlerAdapter handlerAdapter = new Jetty10WebSocketHandlerAdapter(handler, session);
try {
Object creator = createJettyWebSocketCreator(handlerAdapter, selectedProtocol);
Object container = ReflectionUtils.invokeMethod(getContainerMethod, null, servletContext);
ReflectionUtils.invokeMethod(upgradeMethod, container, creator, servletRequest, servletResponse);
}
catch (UndeclaredThrowableException ex) {
throw new HandshakeFailureException("Failed to upgrade", ex.getUndeclaredThrowable());
}
catch (Exception ex) {
throw new HandshakeFailureException("Failed to upgrade", ex);
}
}
private static Object createJettyWebSocketCreator(
Jetty10WebSocketHandlerAdapter adapter, @Nullable String protocol) {
ProxyFactory factory = new ProxyFactory(EmptyTargetSource.INSTANCE);
factory.addInterface(webSocketCreatorClass);
factory.addAdvice(new WebSocketCreatorInterceptor(adapter, protocol));
return factory.getProxy();
}
/**
* Proxy for a JettyWebSocketCreator to supply the WebSocket handler and set the sub-protocol.
*/
private static class WebSocketCreatorInterceptor implements MethodInterceptor {
private final Jetty10WebSocketHandlerAdapter adapter;
@Nullable
private final String protocol;
public WebSocketCreatorInterceptor(
Jetty10WebSocketHandlerAdapter adapter, @Nullable String protocol) {
this.adapter = adapter;
this.protocol = protocol;
}
@Nullable
@Override
public Object invoke(@NonNull MethodInvocation invocation) {
if (this.protocol != null) {
ReflectionUtils.invokeMethod(
setAcceptedSubProtocol, invocation.getArguments()[2], this.protocol);
}
return this.adapter;
}
}
}

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2020 the original author or authors. * Copyright 2002-2021 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -71,9 +71,11 @@ import org.springframework.web.socket.server.RequestUpgradeStrategy;
*/ */
public abstract class AbstractHandshakeHandler implements HandshakeHandler, Lifecycle { public abstract class AbstractHandshakeHandler implements HandshakeHandler, Lifecycle {
private static final boolean tomcatWsPresent;
private static final boolean jettyWsPresent; private static final boolean jettyWsPresent;
private static final boolean tomcatWsPresent; private static final boolean jetty10WsPresent;
private static final boolean undertowWsPresent; private static final boolean undertowWsPresent;
@ -85,10 +87,12 @@ public abstract class AbstractHandshakeHandler implements HandshakeHandler, Life
static { static {
ClassLoader classLoader = AbstractHandshakeHandler.class.getClassLoader(); ClassLoader classLoader = AbstractHandshakeHandler.class.getClassLoader();
jettyWsPresent = ClassUtils.isPresent(
"org.eclipse.jetty.websocket.server.WebSocketServerFactory", classLoader);
tomcatWsPresent = ClassUtils.isPresent( tomcatWsPresent = ClassUtils.isPresent(
"org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", classLoader); "org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", classLoader);
jetty10WsPresent = ClassUtils.isPresent(
"org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", classLoader);
jettyWsPresent = ClassUtils.isPresent(
"org.eclipse.jetty.websocket.server.WebSocketServerFactory", classLoader);
undertowWsPresent = ClassUtils.isPresent( undertowWsPresent = ClassUtils.isPresent(
"io.undertow.websockets.jsr.ServerWebSocketContainer", classLoader); "io.undertow.websockets.jsr.ServerWebSocketContainer", classLoader);
glassfishWsPresent = ClassUtils.isPresent( glassfishWsPresent = ClassUtils.isPresent(
@ -97,7 +101,6 @@ public abstract class AbstractHandshakeHandler implements HandshakeHandler, Life
"weblogic.websocket.tyrus.TyrusServletWriter", classLoader); "weblogic.websocket.tyrus.TyrusServletWriter", classLoader);
websphereWsPresent = ClassUtils.isPresent( websphereWsPresent = ClassUtils.isPresent(
"com.ibm.websphere.wsoc.WsWsocServerContainer", classLoader); "com.ibm.websphere.wsoc.WsWsocServerContainer", classLoader);
} }
@ -137,6 +140,9 @@ public abstract class AbstractHandshakeHandler implements HandshakeHandler, Life
else if (jettyWsPresent) { else if (jettyWsPresent) {
className = "org.springframework.web.socket.server.jetty.JettyRequestUpgradeStrategy"; className = "org.springframework.web.socket.server.jetty.JettyRequestUpgradeStrategy";
} }
else if (jetty10WsPresent) {
className = "org.springframework.web.socket.server.jetty.Jetty10RequestUpgradeStrategy";
}
else if (undertowWsPresent) { else if (undertowWsPresent) {
className = "org.springframework.web.socket.server.standard.UndertowRequestUpgradeStrategy"; className = "org.springframework.web.socket.server.standard.UndertowRequestUpgradeStrategy";
} }