From a14161f0ca43996e10fa3915d959e2c4cc61bab7 Mon Sep 17 00:00:00 2001 From: Phillip Webb Date: Tue, 23 Apr 2013 12:00:06 -0700 Subject: [PATCH] JettyRequestUpgradeStrategy --- build.gradle | 5 + .../server/DefaultHandshakeHandler.java | 5 + .../support/JettyRequestUpgradeStrategy.java | 305 ++++++++++++++++++ 3 files changed, 315 insertions(+) create mode 100644 spring-websocket/src/main/java/org/springframework/websocket/server/support/JettyRequestUpgradeStrategy.java diff --git a/build.gradle b/build.gradle index 57b4a48509..72eebf8059 100644 --- a/build.gradle +++ b/build.gradle @@ -528,6 +528,11 @@ project("spring-websocket") { optional("org.glassfish.tyrus:tyrus-websocket-core:1.0-SNAPSHOT") optional("org.glassfish.tyrus:tyrus-container-servlet:1.0-SNAPSHOT") + optional("org.eclipse.jetty:jetty-webapp:9.0.1.v20130408") { + exclude group: "org.eclipse.jetty.orbit", module: "javax.servlet" + } + optional("org.eclipse.jetty.websocket:websocket-server:9.0.1.v20130408") + optional("com.fasterxml.jackson.core:jackson-databind:2.0.1") // required for SockJS support currently } diff --git a/spring-websocket/src/main/java/org/springframework/websocket/server/DefaultHandshakeHandler.java b/spring-websocket/src/main/java/org/springframework/websocket/server/DefaultHandshakeHandler.java index 3ed1ccb441..6ae721819c 100644 --- a/spring-websocket/src/main/java/org/springframework/websocket/server/DefaultHandshakeHandler.java +++ b/spring-websocket/src/main/java/org/springframework/websocket/server/DefaultHandshakeHandler.java @@ -214,6 +214,8 @@ public class DefaultHandshakeHandler implements HandshakeHandler { private static final boolean glassfishWebSocketPresent = ClassUtils.isPresent( "org.glassfish.tyrus.servlet.TyrusHttpUpgradeHandler", DefaultHandshakeHandler.class.getClassLoader()); + private static final boolean jettyWebSocketPresent = ClassUtils.isPresent( + "org.eclipse.jetty.websocket.server.UpgradeContext", DefaultHandshakeHandler.class.getClassLoader()); private RequestUpgradeStrategy create() { String className; @@ -223,6 +225,9 @@ public class DefaultHandshakeHandler implements HandshakeHandler { else if (glassfishWebSocketPresent) { className = "org.springframework.websocket.server.support.GlassfishRequestUpgradeStrategy"; } + else if (jettyWebSocketPresent) { + className = "org.springframework.websocket.server.support.JettyRequestUpgradeStrategy"; + } else { throw new IllegalStateException("No suitable " + RequestUpgradeStrategy.class.getSimpleName()); } diff --git a/spring-websocket/src/main/java/org/springframework/websocket/server/support/JettyRequestUpgradeStrategy.java b/spring-websocket/src/main/java/org/springframework/websocket/server/support/JettyRequestUpgradeStrategy.java new file mode 100644 index 0000000000..a00ee912ef --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/websocket/server/support/JettyRequestUpgradeStrategy.java @@ -0,0 +1,305 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.websocket.server.support; + +import java.io.IOException; +import java.net.URI; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.UpgradeRequest; +import org.eclipse.jetty.websocket.api.UpgradeResponse; +import org.eclipse.jetty.websocket.api.WebSocketListener; +import org.eclipse.jetty.websocket.server.HandshakeRFC6455; +import org.eclipse.jetty.websocket.server.ServletWebSocketRequest; +import org.eclipse.jetty.websocket.server.WebSocketServerFactory; +import org.eclipse.jetty.websocket.servlet.WebSocketCreator; +import org.springframework.http.server.ServerHttpRequest; +import org.springframework.http.server.ServerHttpResponse; +import org.springframework.http.server.ServletServerHttpRequest; +import org.springframework.http.server.ServletServerHttpResponse; +import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; +import org.springframework.websocket.BinaryMessage; +import org.springframework.websocket.BinaryMessageHandler; +import org.springframework.websocket.CloseStatus; +import org.springframework.websocket.HandlerProvider; +import org.springframework.websocket.TextMessage; +import org.springframework.websocket.TextMessageHandler; +import org.springframework.websocket.WebSocketHandler; +import org.springframework.websocket.WebSocketMessage; +import org.springframework.websocket.WebSocketSession; +import org.springframework.websocket.server.RequestUpgradeStrategy; + +/** + * {@link RequestUpgradeStrategy} for use with Jetty. Based on Jetty's internal + * {@code org.eclipse.jetty.websocket.server.WebSocketHandler} class. + * + * @author Phillip Webb + */ +public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy { + + private static Log logger = LogFactory.getLog(JettyRequestUpgradeStrategy.class); + + // FIXME jetty has options, timeouts etc. Do we need a common abstraction + + // FIXME need a way for someone to plug their own RequestUpgradeStrategy or override + // Jetty settings + + // FIXME when to call factory.cleanup(); + + private static final String HANDLER_PROVIDER = JettyRequestUpgradeStrategy.class.getName() + + ".HANDLER_PROVIDER"; + + private WebSocketServerFactory factory; + + + public JettyRequestUpgradeStrategy() { + this.factory = new WebSocketServerFactory(); + this.factory.setCreator(new WebSocketCreator() { + @Override + @SuppressWarnings("unchecked") + public Object createWebSocket(UpgradeRequest req, UpgradeResponse resp) { + Assert.isInstanceOf(ServletWebSocketRequest.class, req); + ServletWebSocketRequest servletRequest = (ServletWebSocketRequest) req; + HandlerProvider handlerProvider = (HandlerProvider) servletRequest.getServletAttributes().get( + HANDLER_PROVIDER); + return new WebSocketHandlerAdapter(handlerProvider); + } + }); + try { + this.factory.init(); + } + catch (Exception ex) { + throw new IllegalStateException(ex); + } + } + + @Override + public String[] getSupportedVersions() { + return new String[] { String.valueOf(HandshakeRFC6455.VERSION) }; + } + + @Override + public void upgrade(ServerHttpRequest request, ServerHttpResponse response, + String selectedProtocol, HandlerProvider handlerProvider) + throws Exception { + Assert.isInstanceOf(ServletServerHttpRequest.class, request); + Assert.isInstanceOf(ServletServerHttpResponse.class, response); + upgrade(((ServletServerHttpRequest) request).getServletRequest(), + ((ServletServerHttpResponse) response).getServletResponse(), + selectedProtocol, handlerProvider); + } + + private void upgrade(HttpServletRequest request, HttpServletResponse response, + String selectedProtocol, final HandlerProvider handlerProvider) + throws Exception { + request.setAttribute(HANDLER_PROVIDER, handlerProvider); + Assert.state(factory.isUpgradeRequest(request, response), "Not a suitable WebSocket upgrade request"); + Assert.state(factory.acceptWebSocket(request, response), "Unable to accept WebSocket"); + } + + + /** + * Adapts Spring's {@link WebSocketHandler} to Jetty's {@link WebSocketListener}. + */ + private static class WebSocketHandlerAdapter implements WebSocketListener { + + private final HandlerProvider provider; + + private WebSocketHandler handler; + + private WebSocketSession session; + + + public WebSocketHandlerAdapter(HandlerProvider provider) { + Assert.notNull(provider, "Provider must not be null"); + Assert.isAssignable(WebSocketHandler.class, provider.getHandlerType()); + this.provider = provider; + } + + + @Override + public void onWebSocketConnect(Session session) { + Assert.state(this.session == null, "WebSocket already open"); + try { + this.session = new WebSocketSessionAdapter(session); + if (logger.isDebugEnabled()) { + logger.debug("Client connected, WebSocket session id=" + + this.session.getId() + ", uri=" + this.session.getURI()); + } + this.handler = this.provider.getHandler(); + this.handler.afterConnectionEstablished(this.session); + } + catch (Exception ex) { + try { + // FIXME revisit after error handling + onWebSocketError(ex); + } + finally { + this.session = null; + this.handler = null; + } + } + } + + @Override + public void onWebSocketClose(int statusCode, String reason) { + Assert.state(this.session != null, "WebSocket not open"); + try { + CloseStatus closeStatus = new CloseStatus(statusCode, reason); + if (logger.isDebugEnabled()) { + logger.debug("Client disconnected, WebSocket session id=" + + this.session.getId() + ", " + closeStatus); + } + this.handler.afterConnectionClosed(closeStatus, this.session); + } + catch (Exception ex) { + onWebSocketError(ex); + } + finally { + try { + if (this.handler != null) { + this.provider.destroy(this.handler); + } + } + finally { + this.session = null; + this.handler = null; + } + } + } + + @Override + public void onWebSocketText(String payload) { + try { + TextMessage message = new TextMessage(payload); + if (logger.isTraceEnabled()) { + logger.trace("Received message for WebSocket session id=" + + this.session.getId() + ": " + message); + } + if (this.handler instanceof TextMessageHandler) { + ((TextMessageHandler) this.handler).handleTextMessage(message, this.session); + } + } + catch(Exception ex) { + ex.printStackTrace(); //FIXME + } + } + + @Override + public void onWebSocketBinary(byte[] payload, int offset, int len) { + try { + BinaryMessage message = new BinaryMessage(payload, offset, len); + if (logger.isTraceEnabled()) { + logger.trace("Received binary data for WebSocket session id=" + + this.session.getId() + ": " + message); + } + if (this.handler instanceof BinaryMessageHandler) { + ((BinaryMessageHandler) this.handler).handleBinaryMessage(message, + this.session); + } + } + catch(Exception ex) { + ex.printStackTrace(); //FIXME + } + } + + @Override + public void onWebSocketError(Throwable cause) { + try { + this.handler.handleError(cause, this.session); + } + catch (Throwable ex) { + // FIXME exceptions + logger.error("Error for WebSocket session id=" + this.session.getId(), + cause); + } + } + } + + + /** + * Adapts Jetty's {@link Session} to Spring's {@link WebSocketSession}. + */ + private static class WebSocketSessionAdapter implements WebSocketSession { + + private Session session; + + + public WebSocketSessionAdapter(Session session) { + this.session = session; + } + + + @Override + public String getId() { + return ObjectUtils.getIdentityHexString(this.session); + } + + @Override + public boolean isOpen() { + return this.session.isOpen(); + } + + @Override + public boolean isSecure() { + return this.session.isSecure(); + } + + @Override + public URI getURI() { + return this.session.getUpgradeRequest().getRequestURI(); + } + + @Override + public void sendMessage(WebSocketMessage message) throws Exception { + if (message instanceof BinaryMessage) { + sendMessage((BinaryMessage) message); + } + else if (message instanceof TextMessage) { + sendMessage((TextMessage) message); + } + else { + throw new IllegalArgumentException("Unsupported message type"); + } + } + + private void sendMessage(BinaryMessage message) throws Exception { + this.session.getRemote().sendBytes(message.getPayload()); + } + + private void sendMessage(TextMessage message) throws Exception { + this.session.getRemote().sendString(message.getPayload()); + } + + @Override + public void close() throws IOException { + this.session.close(); + } + + @Override + public void close(CloseStatus status) throws IOException { + this.session.close(status.getCode(), status.getReason()); + } + } + +}