diff --git a/spring-websocket/src/main/java/org/springframework/sockjs/AbstractSockJsSession.java b/spring-websocket/src/main/java/org/springframework/sockjs/AbstractSockJsSession.java index 9f3248b8e08..9b1e07c99f9 100644 --- a/spring-websocket/src/main/java/org/springframework/sockjs/AbstractSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/sockjs/AbstractSockJsSession.java @@ -27,6 +27,7 @@ import org.springframework.websocket.HandlerProvider; import org.springframework.websocket.TextMessage; import org.springframework.websocket.WebSocketHandler; import org.springframework.websocket.WebSocketSession; +import org.springframework.websocket.adapter.WebSocketHandlerInvoker; /** @@ -42,9 +43,7 @@ public abstract class AbstractSockJsSession implements WebSocketSession { private final String sessionId; - private final HandlerProvider handlerProvider; - - private WebSocketHandler handler; + private WebSocketHandlerInvoker handler; private State state = State.NEW; @@ -62,7 +61,7 @@ public abstract class AbstractSockJsSession implements WebSocketSession { Assert.notNull(sessionId, "sessionId is required"); Assert.notNull(handlerProvider, "handlerProvider is required"); this.sessionId = sessionId; - this.handlerProvider = handlerProvider; + this.handler = new WebSocketHandlerInvoker(handlerProvider).setLogger(logger); } public String getId() { @@ -124,42 +123,7 @@ public abstract class AbstractSockJsSession implements WebSocketSession { public void delegateConnectionEstablished() { this.state = State.OPEN; - this.handler = handlerProvider.getHandler(); - try { - this.handler.afterConnectionEstablished(this); - } - catch (Throwable ex) { - tryCloseWithError(ex, null); - } - } - - /** - * Close due to unhandled runtime error from WebSocketHandler. - * @param closeStatus TODO - */ - private void tryCloseWithError(Throwable ex, CloseStatus closeStatus) { - logger.error("Unhandled error for " + this, ex); - try { - closeStatus = (closeStatus != null) ? closeStatus : CloseStatus.SERVER_ERROR; - close(closeStatus); - } - catch (Throwable t) { - destroyHandler(); - } - } - - private void destroyHandler() { - try { - if (this.handler != null) { - this.handlerProvider.destroy(this.handler); - } - } - catch (Throwable t) { - logger.warn("Error while destroying handler", t); - } - finally { - this.handler = null; - } + this.handler.afterConnectionEstablished(this); } /** @@ -167,27 +131,17 @@ public abstract class AbstractSockJsSession implements WebSocketSession { */ protected void tryCloseWithSockJsTransportError(Throwable ex, CloseStatus closeStatus) { delegateError(ex); - tryCloseWithError(ex, closeStatus); + this.handler.tryCloseWithError(this, ex, closeStatus); } public void delegateMessages(String[] messages) { - try { - for (String message : messages) { - this.handler.handleTextMessage(new TextMessage(message), this); - } - } - catch (Throwable ex) { - tryCloseWithError(ex, null); + for (String message : messages) { + this.handler.handleTextMessage(new TextMessage(message), this); } } public void delegateError(Throwable ex) { - try { - this.handler.handleTransportError(ex, this); - } - catch (Throwable t) { - tryCloseWithError(t, null); - } + this.handler.handleTransportError(ex, this); } /** @@ -206,12 +160,7 @@ public abstract class AbstractSockJsSession implements WebSocketSession { } finally { this.state = State.CLOSED; - try { - this.handler.afterConnectionClosed(status, this); - } - finally { - destroyHandler(); - } + this.handler.afterConnectionClosed(status, this); } } } @@ -241,12 +190,7 @@ public abstract class AbstractSockJsSession implements WebSocketSession { } finally { this.state = State.CLOSED; - try { - this.handler.afterConnectionClosed(status, this); - } - finally { - destroyHandler(); - } + this.handler.afterConnectionClosed(status, this); } } } diff --git a/spring-websocket/src/main/java/org/springframework/websocket/WebSocketHandler.java b/spring-websocket/src/main/java/org/springframework/websocket/WebSocketHandler.java index a16ab765bf5..5984d49ae0c 100644 --- a/spring-websocket/src/main/java/org/springframework/websocket/WebSocketHandler.java +++ b/spring-websocket/src/main/java/org/springframework/websocket/WebSocketHandler.java @@ -29,11 +29,6 @@ public interface WebSocketHandler { */ void afterConnectionEstablished(WebSocketSession session); - /** - * A WebSocket connection has been closed. - */ - void afterConnectionClosed(CloseStatus closeStatus, WebSocketSession session); - /** * Handle an incoming text message. */ @@ -49,4 +44,9 @@ public interface WebSocketHandler { */ void handleTransportError(Throwable exception, WebSocketSession session); + /** + * A WebSocket connection has been closed. + */ + void afterConnectionClosed(CloseStatus closeStatus, WebSocketSession session); + } diff --git a/spring-websocket/src/main/java/org/springframework/websocket/adapter/JettyWebSocketListenerAdapter.java b/spring-websocket/src/main/java/org/springframework/websocket/adapter/JettyWebSocketListenerAdapter.java new file mode 100644 index 00000000000..2ec1ec6f569 --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/websocket/adapter/JettyWebSocketListenerAdapter.java @@ -0,0 +1,80 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.websocket.adapter; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketListener; +import org.springframework.util.Assert; +import org.springframework.websocket.BinaryMessage; +import org.springframework.websocket.CloseStatus; +import org.springframework.websocket.HandlerProvider; +import org.springframework.websocket.TextMessage; +import org.springframework.websocket.WebSocketHandler; +import org.springframework.websocket.WebSocketSession; + +/** + * Adapts Spring's {@link WebSocketHandler} to Jetty's {@link WebSocketListener}. + * + * @author Phillip Webb + * @since 4.0 + */ +public class JettyWebSocketListenerAdapter implements WebSocketListener { + + private static Log logger = LogFactory.getLog(JettyWebSocketListenerAdapter.class); + + private final WebSocketHandler handler; + + private WebSocketSession wsSession; + + + public JettyWebSocketListenerAdapter(HandlerProvider provider) { + Assert.notNull(provider, "provider is required"); + this.handler = new WebSocketHandlerInvoker(provider).setLogger(logger); + } + + + @Override + public void onWebSocketConnect(Session session) { + this.wsSession = new JettyWebSocketSessionAdapter(session); + this.handler.afterConnectionEstablished(this.wsSession); + } + + @Override + public void onWebSocketClose(int statusCode, String reason) { + CloseStatus closeStatus = new CloseStatus(statusCode, reason); + this.handler.afterConnectionClosed(closeStatus, this.wsSession); + } + + @Override + public void onWebSocketText(String payload) { + TextMessage message = new TextMessage(payload); + this.handler.handleTextMessage(message, this.wsSession); + } + + @Override + public void onWebSocketBinary(byte[] payload, int offset, int len) { + BinaryMessage message = new BinaryMessage(payload, offset, len); + this.handler.handleBinaryMessage(message, this.wsSession); + } + + @Override + public void onWebSocketError(Throwable cause) { + this.handler.handleTransportError(cause, this.wsSession); + } +} diff --git a/spring-websocket/src/main/java/org/springframework/websocket/adapter/JettyWebSocketSessionAdapter.java b/spring-websocket/src/main/java/org/springframework/websocket/adapter/JettyWebSocketSessionAdapter.java new file mode 100644 index 00000000000..ef23f92427d --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/websocket/adapter/JettyWebSocketSessionAdapter.java @@ -0,0 +1,98 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.websocket.adapter; + +import java.io.IOException; +import java.net.URI; + +import org.eclipse.jetty.websocket.api.Session; +import org.springframework.util.ObjectUtils; +import org.springframework.websocket.BinaryMessage; +import org.springframework.websocket.CloseStatus; +import org.springframework.websocket.TextMessage; +import org.springframework.websocket.WebSocketMessage; +import org.springframework.websocket.WebSocketSession; + + +/** + * Adapts Jetty's {@link Session} to Spring's {@link WebSocketSession}. + * + * @author Phillip Webb + * @since 4.0 + */ +public class JettyWebSocketSessionAdapter implements WebSocketSession { + + private Session session; + + + public JettyWebSocketSessionAdapter(Session session) { + this.session = session; + } + + + @Override + public String getId() { + return ObjectUtils.getIdentityHexString(this.session); + } + + @Override + public boolean isOpen() { + return this.session.isOpen(); + } + + @Override + public boolean isSecure() { + return this.session.isSecure(); + } + + @Override + public URI getURI() { + return this.session.getUpgradeRequest().getRequestURI(); + } + + @Override + public void sendMessage(WebSocketMessage message) throws IOException { + if (message instanceof BinaryMessage) { + sendMessage((BinaryMessage) message); + } + else if (message instanceof TextMessage) { + sendMessage((TextMessage) message); + } + else { + throw new IllegalArgumentException("Unsupported message type"); + } + } + + private void sendMessage(BinaryMessage message) throws IOException { + this.session.getRemote().sendBytes(message.getPayload()); + } + + private void sendMessage(TextMessage message) throws IOException { + this.session.getRemote().sendString(message.getPayload()); + } + + @Override + public void close() throws IOException { + this.session.close(); + } + + @Override + public void close(CloseStatus status) throws IOException { + this.session.close(status.getCode(), status.getReason()); + } + +} \ No newline at end of file diff --git a/spring-websocket/src/main/java/org/springframework/websocket/adapter/StandardEndpointAdapter.java b/spring-websocket/src/main/java/org/springframework/websocket/adapter/StandardEndpointAdapter.java new file mode 100644 index 00000000000..3ce78ab01f9 --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/websocket/adapter/StandardEndpointAdapter.java @@ -0,0 +1,114 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.websocket.adapter; + +import java.nio.ByteBuffer; + +import javax.websocket.CloseReason; +import javax.websocket.Endpoint; +import javax.websocket.EndpointConfig; +import javax.websocket.MessageHandler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.util.Assert; +import org.springframework.websocket.BinaryMessage; +import org.springframework.websocket.CloseStatus; +import org.springframework.websocket.HandlerProvider; +import org.springframework.websocket.PartialMessageHandler; +import org.springframework.websocket.TextMessage; +import org.springframework.websocket.WebSocketHandler; +import org.springframework.websocket.WebSocketSession; + + +/** + * An {@link Endpoint} that delegates to a {@link WebSocketHandler}. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class StandardEndpointAdapter extends Endpoint { + + private static Log logger = LogFactory.getLog(StandardEndpointAdapter.class); + + private final WebSocketHandler handler; + + private final Class handlerClass; + + private WebSocketSession wsSession; + + + + public StandardEndpointAdapter(HandlerProvider provider) { + Assert.notNull(provider, "provider is required"); + this.handler = new WebSocketHandlerInvoker(provider).setLogger(logger); + this.handlerClass= provider.getHandlerType(); + } + + + @Override + public void onOpen(final javax.websocket.Session session, EndpointConfig config) { + + session.addMessageHandler(new MessageHandler.Whole() { + @Override + public void onMessage(String message) { + handleTextMessage(session, message); + } + }); + if (PartialMessageHandler.class.isAssignableFrom(this.handlerClass)) { + session.addMessageHandler(new MessageHandler.Partial() { + @Override + public void onMessage(ByteBuffer messagePart, boolean isLast) { + handleBinaryMessage(session, messagePart, isLast); + } + }); + } + else { + session.addMessageHandler(new MessageHandler.Whole() { + @Override + public void onMessage(ByteBuffer message) { + handleBinaryMessage(session, message, true); + } + }); + } + + this.wsSession = new StandardWebSocketSessionAdapter(session); + this.handler.afterConnectionEstablished(this.wsSession); + } + + private void handleTextMessage(javax.websocket.Session session, String payload) { + TextMessage message = new TextMessage(payload); + this.handler.handleTextMessage(message, this.wsSession); + } + + private void handleBinaryMessage(javax.websocket.Session session, ByteBuffer payload, boolean isLast) { + BinaryMessage message = new BinaryMessage(payload, isLast); + this.handler.handleBinaryMessage(message, this.wsSession); + } + + @Override + public void onClose(javax.websocket.Session session, CloseReason reason) { + CloseStatus closeStatus = new CloseStatus(reason.getCloseCode().getCode(), reason.getReasonPhrase()); + this.handler.afterConnectionClosed(closeStatus, this.wsSession); + } + + @Override + public void onError(javax.websocket.Session session, Throwable exception) { + this.handler.handleTransportError(exception, this.wsSession); + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/websocket/endpoint/StandardWebSocketSession.java b/spring-websocket/src/main/java/org/springframework/websocket/adapter/StandardWebSocketSessionAdapter.java similarity index 92% rename from spring-websocket/src/main/java/org/springframework/websocket/endpoint/StandardWebSocketSession.java rename to spring-websocket/src/main/java/org/springframework/websocket/adapter/StandardWebSocketSessionAdapter.java index e59ee859422..30bc063baf2 100644 --- a/spring-websocket/src/main/java/org/springframework/websocket/endpoint/StandardWebSocketSession.java +++ b/spring-websocket/src/main/java/org/springframework/websocket/adapter/StandardWebSocketSessionAdapter.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.websocket.endpoint; +package org.springframework.websocket.adapter; import java.io.IOException; import java.net.URI; @@ -39,14 +39,14 @@ import org.springframework.websocket.WebSocketSession; * @author Rossen Stoyanchev * @since 4.0 */ -public class StandardWebSocketSession implements WebSocketSession { +public class StandardWebSocketSessionAdapter implements WebSocketSession { - private static Log logger = LogFactory.getLog(StandardWebSocketSession.class); + private static Log logger = LogFactory.getLog(StandardWebSocketSessionAdapter.class); private final javax.websocket.Session session; - public StandardWebSocketSession(javax.websocket.Session session) { + public StandardWebSocketSessionAdapter(javax.websocket.Session session) { Assert.notNull(session, "session is required"); this.session = session; } diff --git a/spring-websocket/src/main/java/org/springframework/websocket/adapter/WebSocketHandlerInvoker.java b/spring-websocket/src/main/java/org/springframework/websocket/adapter/WebSocketHandlerInvoker.java new file mode 100644 index 00000000000..45abc18a524 --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/websocket/adapter/WebSocketHandlerInvoker.java @@ -0,0 +1,161 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.websocket.adapter; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.util.Assert; +import org.springframework.websocket.BinaryMessage; +import org.springframework.websocket.CloseStatus; +import org.springframework.websocket.HandlerProvider; +import org.springframework.websocket.TextMessage; +import org.springframework.websocket.WebSocketHandler; +import org.springframework.websocket.WebSocketSession; + +/** + * A class for managing and delegating to a {@link WebSocketHandler} instance, applying + * initialization and destruction as necessary at the start and end of the WebSocket + * session, ensuring that any unhandled exceptions from its methods are caught and handled + * by closing the session, and also adding uniform logging. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class WebSocketHandlerInvoker implements WebSocketHandler { + + private Log logger = LogFactory.getLog(WebSocketHandlerInvoker.class); + + private final HandlerProvider handlerProvider; + + private WebSocketHandler handler; + + private final AtomicInteger sessionCount = new AtomicInteger(0); + + + public WebSocketHandlerInvoker(HandlerProvider handlerProvider) { + this.handlerProvider = handlerProvider; + } + + public WebSocketHandlerInvoker setLogger(Log logger) { + this.logger = logger; + return this; + } + + @Override + public void afterConnectionEstablished(WebSocketSession session) { + if (logger.isDebugEnabled()) { + logger.debug("Connection established, " + session + ", uri=" + session.getURI()); + } + try { + Assert.isTrue(this.sessionCount.compareAndSet(0, 1), "Unexpected new session"); + + this.handler = this.handlerProvider.getHandler(); + this.handler.afterConnectionEstablished(session); + } + catch (Throwable ex) { + tryCloseWithError(session, ex); + } + } + + public void tryCloseWithError(WebSocketSession session, Throwable ex) { + tryCloseWithError(session, ex, null); + } + + public void tryCloseWithError(WebSocketSession session, Throwable ex, CloseStatus status) { + logger.error("Unhandled error for " + session, ex); + if (session.isOpen()) { + try { + session.close(CloseStatus.SERVER_ERROR); + } + catch (Throwable t) { + destroyHandler(); + } + } + } + + private void destroyHandler() { + try { + if (this.handler != null) { + this.handlerProvider.destroy(this.handler); + } + } + catch (Throwable t) { + logger.warn("Error while destroying handler", t); + } + finally { + this.handler = null; + } + } + + @Override + public void handleTextMessage(TextMessage message, WebSocketSession session) { + if (logger.isTraceEnabled()) { + logger.trace("Received text message for " + session + ": " + message); + } + try { + this.handler.handleTextMessage(message, session); + } + catch (Throwable ex) { + tryCloseWithError(session,ex); + } + } + + @Override + public void handleBinaryMessage(BinaryMessage message, WebSocketSession session) { + if (logger.isTraceEnabled()) { + logger.trace("Received binary message for " + session); + } + try { + this.handler.handleBinaryMessage(message, session); + } + catch (Throwable ex) { + tryCloseWithError(session, ex); + } + } + + @Override + public void handleTransportError(Throwable exception, WebSocketSession session) { + if (logger.isDebugEnabled()) { + logger.debug("Transport error for " + session, exception); + } + try { + this.handler.handleTransportError(exception, session); + } + catch (Throwable ex) { + tryCloseWithError(session, ex); + } + } + + @Override + public void afterConnectionClosed(CloseStatus closeStatus, WebSocketSession session) { + if (logger.isDebugEnabled()) { + logger.debug("Connection closed for " + session + ", " + closeStatus); + } + try { + this.handler.afterConnectionClosed(closeStatus, session); + } + catch (Throwable ex) { + logger.error("Unhandled error for " + this, ex); + } + finally { + this.handlerProvider.destroy(this.handler); + } + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/websocket/endpoint/package-info.java b/spring-websocket/src/main/java/org/springframework/websocket/adapter/package-info.java similarity index 76% rename from spring-websocket/src/main/java/org/springframework/websocket/endpoint/package-info.java rename to spring-websocket/src/main/java/org/springframework/websocket/adapter/package-info.java index 64cfefd85e6..695869a5ffc 100644 --- a/spring-websocket/src/main/java/org/springframework/websocket/endpoint/package-info.java +++ b/spring-websocket/src/main/java/org/springframework/websocket/adapter/package-info.java @@ -15,8 +15,8 @@ */ /** - * Classes for use with the standard Java WebSocket endpoints from both client and - * server code. + * Adapters for the {@link org.springframework.websocket.WebSocketHandler} and + * {@link org.springframework.websocket.WebSocketSession} contracts. */ -package org.springframework.websocket.endpoint; +package org.springframework.websocket.adapter; diff --git a/spring-websocket/src/main/java/org/springframework/websocket/client/endpoint/StandardWebSocketClient.java b/spring-websocket/src/main/java/org/springframework/websocket/client/endpoint/StandardWebSocketClient.java index 25519dd2ea9..ca0e103efbb 100644 --- a/spring-websocket/src/main/java/org/springframework/websocket/client/endpoint/StandardWebSocketClient.java +++ b/spring-websocket/src/main/java/org/springframework/websocket/client/endpoint/StandardWebSocketClient.java @@ -35,10 +35,10 @@ import org.springframework.web.util.UriComponentsBuilder; import org.springframework.websocket.HandlerProvider; import org.springframework.websocket.WebSocketHandler; import org.springframework.websocket.WebSocketSession; +import org.springframework.websocket.adapter.StandardWebSocketSessionAdapter; +import org.springframework.websocket.adapter.StandardEndpointAdapter; import org.springframework.websocket.client.WebSocketClient; import org.springframework.websocket.client.WebSocketConnectFailureException; -import org.springframework.websocket.endpoint.StandardWebSocketSession; -import org.springframework.websocket.endpoint.WebSocketHandlerEndpoint; import org.springframework.websocket.support.SimpleHandlerProvider; /** @@ -78,7 +78,7 @@ public class StandardWebSocketClient implements WebSocketClient { public WebSocketSession doHandshake(HandlerProvider handler, final HttpHeaders httpHeaders, URI uri) throws WebSocketConnectFailureException { - Endpoint endpoint = new WebSocketHandlerEndpoint(handler); + Endpoint endpoint = new StandardEndpointAdapter(handler); ClientEndpointConfig.Builder configBuidler = ClientEndpointConfig.Builder.create(); if (httpHeaders != null) { @@ -100,7 +100,7 @@ public class StandardWebSocketClient implements WebSocketClient { try { Session session = this.webSocketContainer.connectToServer(endpoint, configBuidler.build(), uri); - return new StandardWebSocketSession(session); + return new StandardWebSocketSessionAdapter(session); } catch (Exception e) { throw new WebSocketConnectFailureException("Failed to connect to " + uri, e); diff --git a/spring-websocket/src/main/java/org/springframework/websocket/endpoint/WebSocketHandlerEndpoint.java b/spring-websocket/src/main/java/org/springframework/websocket/endpoint/WebSocketHandlerEndpoint.java deleted file mode 100644 index 528ac34ff1e..00000000000 --- a/spring-websocket/src/main/java/org/springframework/websocket/endpoint/WebSocketHandlerEndpoint.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.websocket.endpoint; - -import java.util.concurrent.atomic.AtomicInteger; - -import javax.websocket.CloseReason; -import javax.websocket.Endpoint; -import javax.websocket.EndpointConfig; -import javax.websocket.MessageHandler; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.springframework.util.Assert; -import org.springframework.websocket.BinaryMessage; -import org.springframework.websocket.CloseStatus; -import org.springframework.websocket.HandlerProvider; -import org.springframework.websocket.PartialMessageHandler; -import org.springframework.websocket.TextMessage; -import org.springframework.websocket.WebSocketHandler; -import org.springframework.websocket.WebSocketSession; - - -/** - * An {@link Endpoint} that delegates to a {@link WebSocketHandler}. - * - * @author Rossen Stoyanchev - * @since 4.0 - */ -public class WebSocketHandlerEndpoint extends Endpoint { - - private static Log logger = LogFactory.getLog(WebSocketHandlerEndpoint.class); - - private final HandlerProvider handlerProvider; - - private WebSocketHandler handler; - - private WebSocketSession webSocketSession; - - private final AtomicInteger sessionCount = new AtomicInteger(0); - - - public WebSocketHandlerEndpoint(HandlerProvider handlerProvider) { - Assert.notNull(handlerProvider, "handlerProvider is required"); - this.handlerProvider = handlerProvider; - } - - - @Override - public void onOpen(final javax.websocket.Session session, EndpointConfig config) { - - Assert.isTrue(this.sessionCount.compareAndSet(0, 1), "Unexpected connection"); - - if (logger.isDebugEnabled()) { - logger.debug("Connection established, javax.websocket.Session id=" - + session.getId() + ", uri=" + session.getRequestURI()); - } - - this.webSocketSession = new StandardWebSocketSession(session); - this.handler = handlerProvider.getHandler(); - - session.addMessageHandler(new MessageHandler.Whole() { - @Override - public void onMessage(String message) { - handleTextMessage(session, message); - } - }); - if (this.handler instanceof PartialMessageHandler) { - session.addMessageHandler(new MessageHandler.Partial() { - @Override - public void onMessage(byte[] messagePart, boolean isLast) { - handleBinaryMessage(session, messagePart, isLast); - } - }); - } - else { - session.addMessageHandler(new MessageHandler.Whole() { - @Override - public void onMessage(byte[] message) { - handleBinaryMessage(session, message, true); - } - }); - } - - try { - this.handler.afterConnectionEstablished(this.webSocketSession); - } - catch (Throwable ex) { - tryCloseWithError(ex); - } - } - - private void tryCloseWithError(Throwable ex) { - logger.error("Unhandled error for " + this.webSocketSession, ex); - if (this.webSocketSession.isOpen()) { - try { - this.webSocketSession.close(CloseStatus.SERVER_ERROR); - } - catch (Throwable t) { - destroyHandler(); - } - } - } - - private void destroyHandler() { - try { - if (this.handler != null) { - this.handlerProvider.destroy(this.handler); - } - } - catch (Throwable t) { - logger.warn("Error while destroying handler", t); - } - finally { - this.webSocketSession = null; - this.handler = null; - } - } - - private void handleTextMessage(javax.websocket.Session session, String message) { - if (logger.isTraceEnabled()) { - logger.trace("Received message for WebSocket session id=" + session.getId() + ": " + message); - } - try { - TextMessage textMessage = new TextMessage(message); - this.handler.handleTextMessage(textMessage, this.webSocketSession); - } - catch (Throwable ex) { - tryCloseWithError(ex); - } - } - - private void handleBinaryMessage(javax.websocket.Session session, byte[] message, boolean isLast) { - if (logger.isTraceEnabled()) { - logger.trace("Received binary data for WebSocket session id=" + session.getId()); - } - try { - BinaryMessage binaryMessage = new BinaryMessage(message, isLast); - this.handler.handleBinaryMessage(binaryMessage, this.webSocketSession); - } - catch (Throwable ex) { - tryCloseWithError(ex); - } - } - - @Override - public void onClose(javax.websocket.Session session, CloseReason reason) { - if (logger.isDebugEnabled()) { - logger.debug("Connection closed, WebSocket session id=" + session.getId() + ", " + reason); - } - try { - CloseStatus closeStatus = new CloseStatus(reason.getCloseCode().getCode(), reason.getReasonPhrase()); - this.handler.afterConnectionClosed(closeStatus, this.webSocketSession); - } - catch (Throwable ex) { - logger.error("Unhandled error for " + this.webSocketSession, ex); - } - finally { - this.handlerProvider.destroy(this.handler); - } - } - - @Override - public void onError(javax.websocket.Session session, Throwable exception) { - logger.error("Error for WebSocket session id=" + session.getId(), exception); - try { - this.handler.handleTransportError(exception, this.webSocketSession); - } - catch (Throwable ex) { - tryCloseWithError(ex); - } - } - -} diff --git a/spring-websocket/src/main/java/org/springframework/websocket/server/endpoint/EndpointRegistration.java b/spring-websocket/src/main/java/org/springframework/websocket/server/endpoint/EndpointRegistration.java index 61546fc2528..ebde513034a 100644 --- a/spring-websocket/src/main/java/org/springframework/websocket/server/endpoint/EndpointRegistration.java +++ b/spring-websocket/src/main/java/org/springframework/websocket/server/endpoint/EndpointRegistration.java @@ -29,14 +29,11 @@ import javax.websocket.HandshakeResponse; import javax.websocket.server.HandshakeRequest; import javax.websocket.server.ServerEndpointConfig; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.util.Assert; import org.springframework.websocket.HandlerProvider; -import org.springframework.websocket.endpoint.WebSocketHandlerEndpoint; import org.springframework.websocket.support.BeanCreatingHandlerProvider; import org.springframework.websocket.support.SimpleHandlerProvider; @@ -44,8 +41,6 @@ import org.springframework.websocket.support.SimpleHandlerProvider; /** * An implementation of {@link javax.websocket.server.ServerEndpointConfig} that also * holds the target {@link javax.websocket.Endpoint} as a reference or a bean name. - * The target can also be {@link org.springframework.websocket.WebSocketHandler}, in - * which case it will be adapted via {@link WebSocketHandlerEndpoint}. * *

* Beans of this type are detected by {@link EndpointExporter} and diff --git a/spring-websocket/src/main/java/org/springframework/websocket/server/support/AbstractEndpointUpgradeStrategy.java b/spring-websocket/src/main/java/org/springframework/websocket/server/support/AbstractEndpointUpgradeStrategy.java index ec114934b57..e27ca8b66e1 100644 --- a/spring-websocket/src/main/java/org/springframework/websocket/server/support/AbstractEndpointUpgradeStrategy.java +++ b/spring-websocket/src/main/java/org/springframework/websocket/server/support/AbstractEndpointUpgradeStrategy.java @@ -26,7 +26,7 @@ import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.websocket.HandlerProvider; import org.springframework.websocket.WebSocketHandler; -import org.springframework.websocket.endpoint.WebSocketHandlerEndpoint; +import org.springframework.websocket.adapter.StandardEndpointAdapter; import org.springframework.websocket.server.RequestUpgradeStrategy; /** @@ -49,7 +49,7 @@ public abstract class AbstractEndpointUpgradeStrategy implements RequestUpgradeS } protected Endpoint adaptWebSocketHandler(HandlerProvider handler) { - return new WebSocketHandlerEndpoint(handler); + return new StandardEndpointAdapter(handler); } protected abstract void upgradeInternal(ServerHttpRequest request, ServerHttpResponse response, diff --git a/spring-websocket/src/main/java/org/springframework/websocket/server/support/JettyRequestUpgradeStrategy.java b/spring-websocket/src/main/java/org/springframework/websocket/server/support/JettyRequestUpgradeStrategy.java index 2713266ff45..95e54b34e42 100644 --- a/spring-websocket/src/main/java/org/springframework/websocket/server/support/JettyRequestUpgradeStrategy.java +++ b/spring-websocket/src/main/java/org/springframework/websocket/server/support/JettyRequestUpgradeStrategy.java @@ -17,18 +17,12 @@ package org.springframework.websocket.server.support; import java.io.IOException; -import java.net.URI; -import java.util.concurrent.atomic.AtomicInteger; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.UpgradeRequest; import org.eclipse.jetty.websocket.api.UpgradeResponse; -import org.eclipse.jetty.websocket.api.WebSocketListener; import org.eclipse.jetty.websocket.server.HandshakeRFC6455; import org.eclipse.jetty.websocket.server.ServletWebSocketRequest; import org.eclipse.jetty.websocket.server.WebSocketServerFactory; @@ -38,14 +32,9 @@ import org.springframework.http.server.ServerHttpResponse; import org.springframework.http.server.ServletServerHttpRequest; import org.springframework.http.server.ServletServerHttpResponse; import org.springframework.util.Assert; -import org.springframework.util.ObjectUtils; -import org.springframework.websocket.BinaryMessage; -import org.springframework.websocket.CloseStatus; import org.springframework.websocket.HandlerProvider; -import org.springframework.websocket.TextMessage; import org.springframework.websocket.WebSocketHandler; -import org.springframework.websocket.WebSocketMessage; -import org.springframework.websocket.WebSocketSession; +import org.springframework.websocket.adapter.JettyWebSocketListenerAdapter; import org.springframework.websocket.server.RequestUpgradeStrategy; /** @@ -53,11 +42,10 @@ import org.springframework.websocket.server.RequestUpgradeStrategy; * {@code org.eclipse.jetty.websocket.server.WebSocketHandler} class. * * @author Phillip Webb + * @since 4.0 */ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy { - private static Log logger = LogFactory.getLog(JettyRequestUpgradeStrategy.class); - // FIXME jetty has options, timeouts etc. Do we need a common abstraction // FIXME need a way for someone to plug their own RequestUpgradeStrategy or override @@ -65,7 +53,7 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy { // FIXME when to call factory.cleanup(); - private static final String HANDLER_PROVIDER = JettyRequestUpgradeStrategy.class.getName() + private static final String HANDLER_PROVIDER_ATTR_NAME = JettyRequestUpgradeStrategy.class.getName() + ".HANDLER_PROVIDER"; private WebSocketServerFactory factory; @@ -76,12 +64,13 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy { this.factory.setCreator(new WebSocketCreator() { @Override @SuppressWarnings("unchecked") - public Object createWebSocket(UpgradeRequest req, UpgradeResponse resp) { - Assert.isInstanceOf(ServletWebSocketRequest.class, req); - ServletWebSocketRequest servletRequest = (ServletWebSocketRequest) req; - HandlerProvider handlerProvider = (HandlerProvider) servletRequest.getServletAttributes().get( - HANDLER_PROVIDER); - return new WebSocketHandlerAdapter(handlerProvider); + public Object createWebSocket(UpgradeRequest request, UpgradeResponse response) { + Assert.isInstanceOf(ServletWebSocketRequest.class, request); + ServletWebSocketRequest servletRequest = (ServletWebSocketRequest) request; + HandlerProvider handlerProvider = + (HandlerProvider) servletRequest.getServletAttributes().get( + HANDLER_PROVIDER_ATTR_NAME); + return new JettyWebSocketListenerAdapter(handlerProvider); } }); try { @@ -100,215 +89,24 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy { @Override public void upgrade(ServerHttpRequest request, ServerHttpResponse response, - String selectedProtocol, HandlerProvider handlerProvider) - throws IOException { + String selectedProtocol, HandlerProvider handlerProvider) throws IOException { + Assert.isInstanceOf(ServletServerHttpRequest.class, request); + HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest(); + Assert.isInstanceOf(ServletServerHttpResponse.class, response); - upgrade(((ServletServerHttpRequest) request).getServletRequest(), - ((ServletServerHttpResponse) response).getServletResponse(), - selectedProtocol, handlerProvider); + HttpServletResponse servletResponse = ((ServletServerHttpResponse) response).getServletResponse(); + + upgrade(servletRequest, servletResponse, selectedProtocol, handlerProvider); } private void upgrade(HttpServletRequest request, HttpServletResponse response, - String selectedProtocol, final HandlerProvider handlerProvider) - throws IOException { - request.setAttribute(HANDLER_PROVIDER, handlerProvider); - Assert.state(factory.isUpgradeRequest(request, response), "Not a suitable WebSocket upgrade request"); - Assert.state(factory.acceptWebSocket(request, response), "Unable to accept WebSocket"); - } + String selectedProtocol, final HandlerProvider handlerProvider) throws IOException { + Assert.state(this.factory.isUpgradeRequest(request, response), "Not a suitable WebSocket upgrade request"); + Assert.state(this.factory.acceptWebSocket(request, response), "Unable to accept WebSocket"); - /** - * Adapts Spring's {@link WebSocketHandler} to Jetty's {@link WebSocketListener}. - */ - private static class WebSocketHandlerAdapter implements WebSocketListener { - - private final HandlerProvider provider; - - private WebSocketHandler handler; - - private WebSocketSession session; - - private final AtomicInteger sessionCount = new AtomicInteger(0); - - - public WebSocketHandlerAdapter(HandlerProvider provider) { - Assert.notNull(provider, "Provider must not be null"); - Assert.isAssignable(WebSocketHandler.class, provider.getHandlerType()); - this.provider = provider; - } - - - @Override - public void onWebSocketConnect(Session session) { - - Assert.isTrue(this.sessionCount.compareAndSet(0, 1), "Unexpected connection"); - - this.session = new WebSocketSessionAdapter(session); - if (logger.isDebugEnabled()) { - logger.debug("Connection established, WebSocket session id=" - + this.session.getId() + ", uri=" + this.session.getURI()); - } - this.handler = this.provider.getHandler(); - - try { - this.handler.afterConnectionEstablished(this.session); - } - catch (Throwable ex) { - tryCloseWithError(ex); - } - } - - private void tryCloseWithError(Throwable ex) { - logger.error("Unhandled error for " + this.session, ex); - if (this.session.isOpen()) { - try { - this.session.close(CloseStatus.SERVER_ERROR); - } - catch (Throwable t) { - destroyHandler(); - } - } - } - - private void destroyHandler() { - try { - if (this.handler != null) { - this.provider.destroy(this.handler); - } - } - catch (Throwable t) { - logger.warn("Error while destroying handler", t); - } - finally { - this.session = null; - this.handler = null; - } - } - - @Override - public void onWebSocketClose(int statusCode, String reason) { - try { - CloseStatus closeStatus = new CloseStatus(statusCode, reason); - if (logger.isDebugEnabled()) { - logger.debug("Connection closed, WebSocket session id=" - + this.session.getId() + ", " + closeStatus); - } - this.handler.afterConnectionClosed(closeStatus, this.session); - } - catch (Throwable ex) { - logger.error("Unhandled error for " + this.session, ex); - } - finally { - destroyHandler(); - } - } - - @Override - public void onWebSocketText(String payload) { - try { - TextMessage message = new TextMessage(payload); - if (logger.isTraceEnabled()) { - logger.trace("Received message for WebSocket session id=" - + this.session.getId() + ": " + message); - } - this.handler.handleTextMessage(message, this.session); - } - catch(Throwable ex) { - tryCloseWithError(ex); - } - } - - @Override - public void onWebSocketBinary(byte[] payload, int offset, int len) { - try { - BinaryMessage message = new BinaryMessage(payload, offset, len); - if (logger.isTraceEnabled()) { - logger.trace("Received binary data for WebSocket session id=" - + this.session.getId() + ": " + message); - } - this.handler.handleBinaryMessage(message, this.session); - } - catch(Throwable ex) { - tryCloseWithError(ex); - } - } - - @Override - public void onWebSocketError(Throwable cause) { - try { - this.handler.handleTransportError(cause, this.session); - } - catch (Throwable ex) { - tryCloseWithError(ex); - } - } - } - - - /** - * Adapts Jetty's {@link Session} to Spring's {@link WebSocketSession}. - */ - private static class WebSocketSessionAdapter implements WebSocketSession { - - private Session session; - - - public WebSocketSessionAdapter(Session session) { - this.session = session; - } - - - @Override - public String getId() { - return ObjectUtils.getIdentityHexString(this.session); - } - - @Override - public boolean isOpen() { - return this.session.isOpen(); - } - - @Override - public boolean isSecure() { - return this.session.isSecure(); - } - - @Override - public URI getURI() { - return this.session.getUpgradeRequest().getRequestURI(); - } - - @Override - public void sendMessage(WebSocketMessage message) throws IOException { - if (message instanceof BinaryMessage) { - sendMessage((BinaryMessage) message); - } - else if (message instanceof TextMessage) { - sendMessage((TextMessage) message); - } - else { - throw new IllegalArgumentException("Unsupported message type"); - } - } - - private void sendMessage(BinaryMessage message) throws IOException { - this.session.getRemote().sendBytes(message.getPayload()); - } - - private void sendMessage(TextMessage message) throws IOException { - this.session.getRemote().sendString(message.getPayload()); - } - - @Override - public void close() throws IOException { - this.session.close(); - } - - @Override - public void close(CloseStatus status) throws IOException { - this.session.close(status.getCode(), status.getReason()); - } + request.setAttribute(HANDLER_PROVIDER_ATTR_NAME, handlerProvider); } }