Improve APIs for WebSocket and SockJS messages

This commit is contained in:
Rossen Stoyanchev 2013-04-18 22:04:18 -04:00
parent 20a9df772a
commit ab5d60d343
21 changed files with 545 additions and 218 deletions

View File

@ -16,21 +16,36 @@
package org.springframework.sockjs;
import org.springframework.websocket.CloseStatus;
/**
* A handler for SockJS messages.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface SockJsHandler {
void newSession(SockJsSession session) throws Exception;
/**
* A new connection was opened and is ready for use.
*/
void afterConnectionEstablished(SockJsSession session) throws Exception;
void handleMessage(SockJsSession session, String message) throws Exception;
/**
* Handle an incoming message.
*/
void handleMessage(String message, SockJsSession session) throws Exception;
void handleException(SockJsSession session, Throwable exception);
/**
* TODO
*/
void handleError(Throwable exception, SockJsSession session);
void sessionClosed(SockJsSession session);
/**
* A connection has been closed.
*/
void afterConnectionClosed(CloseStatus status, SockJsSession session);
}

View File

@ -16,6 +16,8 @@
package org.springframework.sockjs;
import org.springframework.websocket.CloseStatus;
/**
*
@ -25,19 +27,19 @@ package org.springframework.sockjs;
public class SockJsHandlerAdapter implements SockJsHandler {
@Override
public void newSession(SockJsSession session) throws Exception {
public void afterConnectionEstablished(SockJsSession session) throws Exception {
}
@Override
public void handleMessage(SockJsSession session, String message) throws Exception {
public void handleMessage(String message, SockJsSession session) throws Exception {
}
@Override
public void handleException(SockJsSession session, Throwable exception) {
public void handleError(Throwable exception, SockJsSession session) {
}
@Override
public void sessionClosed(SockJsSession session) {
public void afterConnectionClosed(CloseStatus status, SockJsSession session) {
}
}

View File

@ -18,19 +18,45 @@ package org.springframework.sockjs;
import java.io.IOException;
import org.springframework.websocket.CloseStatus;
/**
* Allows sending SockJS messages as well as closing the underlying connection.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface SockJsSession {
/**
* Return a unique SockJS session identifier.
*/
String getId();
/**
* Return whether the connection is still open.
*/
boolean isOpen();
/**
* Send a message.
*/
void sendMessage(String text) throws IOException;
void close();
/**
* Close the underlying connection with status 1000, i.e. equivalent to:
* <pre>
* session.close(CloseStatus.NORMAL);
* </pre>
*/
void close() throws IOException;
/**
* Close the underlying connection with the given close status.
*/
void close(CloseStatus status) throws IOException;
}

View File

@ -18,6 +18,7 @@ package org.springframework.sockjs;
/**
* A factory for creating a SockJS session.
*
* @author Rossen Stoyanchev
* @since 4.0

View File

@ -16,9 +16,12 @@
package org.springframework.sockjs;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.util.Assert;
import org.springframework.websocket.CloseStatus;
/**
@ -99,33 +102,78 @@ public abstract class SockJsSessionSupport implements SockJsSession {
this.timeLastActive = System.currentTimeMillis();
}
public void connectionInitialized() throws Exception {
public void delegateConnectionEstablished() throws Exception {
this.state = State.OPEN;
this.sockJsHandler.newSession(this);
this.sockJsHandler.afterConnectionEstablished(this);
}
public void delegateMessages(String... messages) throws Exception {
public void delegateMessages(String[] messages) throws Exception {
for (String message : messages) {
this.sockJsHandler.handleMessage(this, message);
this.sockJsHandler.handleMessage(message, this);
}
}
public void delegateException(Throwable ex) {
this.sockJsHandler.handleException(this, ex);
public void delegateError(Throwable ex) {
this.sockJsHandler.handleError(ex, this);
}
public void connectionClosed() {
this.state = State.CLOSED;
this.sockJsHandler.sessionClosed(this);
/**
* Invoked in reaction to the underlying connection being closed by the remote side
* (or the WebSocket container) in order to perform cleanup and notify the
* {@link SockJsHandler}. This is in contrast to {@link #close()} that pro-actively
* closes the connection.
*/
public final void delegateConnectionClosed(CloseStatus status) {
if (!isClosed()) {
if (logger.isDebugEnabled()) {
logger.debug(this + " was closed, " + status);
}
try {
connectionClosedInternal(status);
}
finally {
this.state = State.CLOSED;
this.sockJsHandler.afterConnectionClosed(status, this);
}
}
}
public void close() {
this.state = State.CLOSED;
this.sockJsHandler.sessionClosed(this);
protected void connectionClosedInternal(CloseStatus status) {
}
/**
* {@inheritDoc}
* <p>Performs cleanup and notifies the {@link SockJsHandler}.
*/
public final void close() throws IOException {
close(CloseStatus.NORMAL);
}
/**
* {@inheritDoc}
* <p>Performs cleanup and notifies the {@link SockJsHandler}.
*/
public final void close(CloseStatus status) throws IOException {
if (!isClosed()) {
if (logger.isDebugEnabled()) {
logger.debug("Closing " + this + ", " + status);
}
try {
closeInternal(status);
}
finally {
this.state = State.CLOSED;
this.sockJsHandler.afterConnectionClosed(status, this);
}
}
}
protected abstract void closeInternal(CloseStatus status) throws IOException;
@Override
public String toString() {
return getClass().getSimpleName() + " [id=" + sessionId + "]";
return "SockJS session id=" + this.sessionId;
}

View File

@ -26,6 +26,7 @@ import org.springframework.sockjs.SockJsHandler;
import org.springframework.sockjs.SockJsSession;
import org.springframework.sockjs.SockJsSessionSupport;
import org.springframework.util.Assert;
import org.springframework.websocket.CloseStatus;
/**
@ -42,9 +43,9 @@ public abstract class AbstractServerSockJsSession extends SockJsSessionSupport {
private ScheduledFuture<?> heartbeatTask;
public AbstractServerSockJsSession(String sessionId, SockJsConfiguration sockJsConfig, SockJsHandler sockJsHandler) {
public AbstractServerSockJsSession(String sessionId, SockJsConfiguration config, SockJsHandler sockJsHandler) {
super(sessionId, sockJsHandler);
this.sockJsConfig = sockJsConfig;
this.sockJsConfig = config;
}
protected SockJsConfiguration getSockJsConfig() {
@ -60,36 +61,34 @@ public abstract class AbstractServerSockJsSession extends SockJsSessionSupport {
@Override
public void connectionClosed() {
logger.debug("Session closed");
super.close();
public void connectionClosedInternal(CloseStatus status) {
updateLastActiveTime();
cancelHeartbeat();
}
@Override
public final synchronized void close() {
if (!isClosed()) {
logger.debug("Closing session");
if (isActive()) {
// deliver messages "in flight" before sending close frame
try {
writeFrame(SockJsFrame.closeFrameGoAway());
}
catch (Exception e) {
// ignore
}
public final synchronized void closeInternal(CloseStatus status) throws IOException {
if (isActive()) {
// TODO: deliver messages "in flight" before sending close frame
try {
// bypass writeFrame
writeFrameInternal(SockJsFrame.closeFrame(status.getCode(), status.getReason()));
}
catch (Throwable ex) {
logger.warn("Failed to send SockJS close frame: " + ex.getMessage());
}
super.close();
cancelHeartbeat();
closeInternal();
}
updateLastActiveTime();
cancelHeartbeat();
disconnect(status);
}
protected abstract void closeInternal();
// TODO: close status/reason
protected abstract void disconnect(CloseStatus status) throws IOException;
/**
* For internal use within a TransportHandler and the (TransportHandler-specific)
* session sub-class. The frame is written only if the connection is active.
* session sub-class.
*/
protected void writeFrame(SockJsFrame frame) throws IOException {
if (logger.isTraceEnabled()) {
@ -103,29 +102,20 @@ public abstract class AbstractServerSockJsSession extends SockJsSessionSupport {
logger.warn("Client went away. Terminating connection");
}
else {
logger.warn("Failed to send message. Terminating connection: " + ex.getMessage());
logger.warn("Terminating connection due to failure to send message: " + ex.getMessage());
}
deactivate();
close();
throw ex;
}
catch (Throwable t) {
logger.warn("Failed to send message. Terminating connection: " + t.getMessage());
deactivate();
catch (Throwable ex) {
logger.warn("Terminating connection due to failure to send message: " + ex.getMessage());
close();
throw new NestedSockJsRuntimeException("Failed to write frame " + frame, t);
throw new NestedSockJsRuntimeException("Failed to write frame " + frame, ex);
}
}
protected abstract void writeFrameInternal(SockJsFrame frame) throws IOException;
/**
* Some {@link TransportHandler} types cannot detect if a client connection is closed
* or lost and will eventually fail to send messages. When that happens, we need a way
* to disconnect the underlying connection before calling {@link #close()}.
*/
protected abstract void deactivate();
public synchronized void sendHeartbeat() throws IOException {
if (isActive()) {
writeFrame(SockJsFrame.heartbeatFrame());

View File

@ -56,7 +56,7 @@ public abstract class AbstractSockJsService
private static final int ONE_YEAR = 365 * 24 * 60 * 60;
private String name = getClass().getSimpleName() + "@" + Integer.toHexString(hashCode());
private String name = getClass().getSimpleName() + "@" + ObjectUtils.getIdentityHexString(this);
private String clientLibraryUrl = "https://d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js";

View File

@ -92,7 +92,7 @@ public abstract class AbstractHttpSendingTransportHandler
logger.debug("Opening " + getTransportType() + " connection");
session.setFrameFormat(getFrameFormat(request));
session.writeFrame(response, SockJsFrame.openFrame());
session.connectionInitialized();
session.delegateConnectionEstablished();
}
protected abstract MediaType getContentType();

View File

@ -29,6 +29,7 @@ import org.springframework.sockjs.server.SockJsFrame;
import org.springframework.sockjs.server.SockJsFrame.FrameFormat;
import org.springframework.sockjs.server.TransportHandler;
import org.springframework.util.Assert;
import org.springframework.websocket.CloseStatus;
/**
* An abstract base class for use with HTTP-based transports.
@ -109,14 +110,22 @@ public abstract class AbstractHttpServerSockJsSession extends AbstractServerSock
protected abstract void flushCache() throws IOException;
@Override
public void connectionClosed() {
super.connectionClosed();
protected void disconnect(CloseStatus status) {
resetRequest();
}
@Override
protected void closeInternal() {
resetRequest();
protected synchronized void resetRequest() {
updateLastActiveTime();
if (isActive()) {
try {
this.asyncRequest.completeAsync();
}
catch (Throwable ex) {
logger.warn("Failed to complete async request: " + ex.getMessage());
}
}
this.asyncRequest = null;
this.response = null;
}
protected synchronized void writeFrameInternal(SockJsFrame frame) throws IOException {
@ -138,20 +147,4 @@ public abstract class AbstractHttpServerSockJsSession extends AbstractServerSock
response.getBody().write(frame.getContentBytes());
}
@Override
protected void deactivate() {
this.asyncRequest = null;
this.response = null;
updateLastActiveTime();
}
protected synchronized void resetRequest() {
if (isActive()) {
this.asyncRequest.completeAsync();
}
this.asyncRequest = null;
this.response = null;
updateLastActiveTime();
}
}

View File

@ -17,7 +17,6 @@
package org.springframework.sockjs.server.transport;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -30,6 +29,7 @@ import org.springframework.sockjs.server.SockJsConfiguration;
import org.springframework.sockjs.server.SockJsFrame;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.websocket.CloseStatus;
import org.springframework.websocket.WebSocketHandler;
import org.springframework.websocket.WebSocketSession;
@ -78,21 +78,15 @@ public class SockJsWebSocketHandler implements WebSocketHandler {
}
@Override
public void newSession(WebSocketSession wsSession) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("New session: " + wsSession);
}
public void afterConnectionEstablished(WebSocketSession wsSession) throws Exception {
SockJsSessionSupport session = new WebSocketServerSockJsSession(wsSession, getSockJsConfig());
this.sessions.put(wsSession, session);
}
@Override
public void handleTextMessage(WebSocketSession wsSession, String message) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("Received payload " + message + " for " + wsSession);
}
public void handleTextMessage(String message, WebSocketSession wsSession) throws Exception {
if (StringUtils.isEmpty(message)) {
logger.trace("Ignoring empty payload");
logger.trace("Ignoring empty message");
return;
}
try {
@ -107,41 +101,50 @@ public class SockJsWebSocketHandler implements WebSocketHandler {
}
@Override
public void handleBinaryMessage(WebSocketSession session, InputStream message) throws Exception {
// should not happen
throw new UnsupportedOperationException();
public void handleBinaryMessage(byte[] message, WebSocketSession session) throws Exception {
logger.warn("Unexpected binary message for " + session);
session.close(CloseStatus.NOT_ACCEPTABLE);
}
@Override
public void handleException(WebSocketSession webSocketSession, Throwable exception) {
public void afterConnectionClosed(CloseStatus status, WebSocketSession wsSession) throws Exception {
SockJsSessionSupport session = this.sessions.remove(wsSession);
session.delegateConnectionClosed(status);
}
@Override
public void handleError(Throwable exception, WebSocketSession webSocketSession) {
SockJsSessionSupport session = getSockJsSession(webSocketSession);
session.delegateException(exception);
session.delegateError(exception);
}
@Override
public void sessionClosed(WebSocketSession webSocketSession, int statusCode, String reason) throws Exception {
logger.debug("WebSocket session closed " + webSocketSession);
SockJsSessionSupport session = this.sessions.remove(webSocketSession);
session.connectionClosed();
private static String getSockJsSessionId(WebSocketSession wsSession) {
Assert.notNull(wsSession, "wsSession is required");
String path = wsSession.getURI().getPath();
String[] segments = StringUtils.tokenizeToStringArray(path, "/");
Assert.isTrue(segments.length > 3, "SockJS request should have at least 3 patgh segments: " + path);
return segments[segments.length-2];
}
private class WebSocketServerSockJsSession extends AbstractServerSockJsSession {
private WebSocketSession webSocketSession;
private final WebSocketSession wsSession;
public WebSocketServerSockJsSession(WebSocketSession wsSession, SockJsConfiguration sockJsConfig) throws Exception {
super(String.valueOf(wsSession.hashCode()), sockJsConfig, getSockJsHandler());
this.webSocketSession = wsSession;
this.webSocketSession.sendText(SockJsFrame.openFrame().getContent());
public WebSocketServerSockJsSession(WebSocketSession wsSession, SockJsConfiguration sockJsConfig)
throws Exception {
super(getSockJsSessionId(wsSession), sockJsConfig, getSockJsHandler());
this.wsSession = wsSession;
this.wsSession.sendTextMessage(SockJsFrame.openFrame().getContent());
scheduleHeartbeat();
connectionInitialized();
delegateConnectionEstablished();
}
@Override
public boolean isActive() {
return ((this.webSocketSession != null) && this.webSocketSession.isOpen());
return this.wsSession.isOpen();
}
@Override
@ -156,27 +159,12 @@ public class SockJsWebSocketHandler implements WebSocketHandler {
if (logger.isTraceEnabled()) {
logger.trace("Write " + frame);
}
this.webSocketSession.sendText(frame.getContent());
this.wsSession.sendTextMessage(frame.getContent());
}
@Override
public void connectionClosed() {
super.connectionClosed();
this.webSocketSession = null;
}
@Override
public void closeInternal() {
deactivate();
updateLastActiveTime();
}
@Override
protected void deactivate() {
if (this.webSocketSession != null) {
this.webSocketSession.close();
this.webSocketSession = null;
}
protected void disconnect(CloseStatus status) throws IOException {
this.wsSession.close(status);
}
}

View File

@ -17,7 +17,6 @@
package org.springframework.sockjs.server.transport;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -27,14 +26,15 @@ import org.springframework.sockjs.SockJsHandler;
import org.springframework.sockjs.SockJsSessionSupport;
import org.springframework.sockjs.server.SockJsConfiguration;
import org.springframework.util.Assert;
import org.springframework.websocket.CloseStatus;
import org.springframework.websocket.WebSocketHandler;
import org.springframework.websocket.WebSocketSession;
/**
* A {@link WebSocketHandler} that merely delegates to a {@link SockJsHandler} without any
* SockJS message framing. For use with raw WebSocket communication at SockJS path
* "/websocket".
* A plain {@link WebSocketHandler} to {@link SockJsHandler} adapter that merely delegates
* without any additional SockJS message framing. Used for raw WebSocket communication at
* SockJS path "/websocket".
*
* @author Rossen Stoyanchev
* @since 4.0
@ -71,79 +71,61 @@ public class WebSocketSockJsHandlerAdapter implements WebSocketHandler {
}
@Override
public void newSession(WebSocketSession wsSession) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("New session: " + wsSession);
}
public void afterConnectionEstablished(WebSocketSession wsSession) throws Exception {
SockJsSessionSupport session = new SockJsWebSocketSessionAdapter(wsSession);
this.sessions.put(wsSession, session);
}
@Override
public void handleTextMessage(WebSocketSession wsSession, String message) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("Received payload " + message);
}
public void handleTextMessage(String message, WebSocketSession wsSession) throws Exception {
SockJsSessionSupport session = getSockJsSession(wsSession);
session.delegateMessages(message);
session.delegateMessages(new String[] { message });
}
@Override
public void handleBinaryMessage(WebSocketSession session, InputStream message) throws Exception {
// should not happen
throw new UnsupportedOperationException();
public void handleBinaryMessage(byte[] message, WebSocketSession session) throws Exception {
logger.warn("Unexpected binary message for " + session);
session.close(CloseStatus.NOT_ACCEPTABLE);
}
@Override
public void handleException(WebSocketSession webSocketSession, Throwable exception) {
SockJsSessionSupport session = getSockJsSession(webSocketSession);
session.delegateException(exception);
public void afterConnectionClosed(CloseStatus status, WebSocketSession wsSession) throws Exception {
SockJsSessionSupport session = this.sessions.remove(wsSession);
session.delegateConnectionClosed(status);
}
@Override
public void sessionClosed(WebSocketSession webSocketSession, int statusCode, String reason) throws Exception {
logger.debug("WebSocket session closed " + webSocketSession);
SockJsSessionSupport session = this.sessions.remove(webSocketSession);
session.connectionClosed();
public void handleError(Throwable exception, WebSocketSession wsSession) {
logger.error("Error for " + wsSession);
SockJsSessionSupport session = getSockJsSession(wsSession);
session.delegateError(exception);
}
private class SockJsWebSocketSessionAdapter extends SockJsSessionSupport {
private WebSocketSession wsSession;
private final WebSocketSession wsSession;
public SockJsWebSocketSessionAdapter(WebSocketSession wsSession) throws Exception {
super(String.valueOf(wsSession.hashCode()), getSockJsHandler());
super(wsSession.getId(), getSockJsHandler());
this.wsSession = wsSession;
connectionInitialized();
delegateConnectionEstablished();
}
@Override
public boolean isActive() {
return (!isClosed() && this.wsSession.isOpen());
return this.wsSession.isOpen();
}
@Override
public void sendMessage(String message) throws IOException {
this.wsSession.sendText(message);
this.wsSession.sendTextMessage(message);
}
@Override
public void connectionClosed() {
logger.debug("Session closed");
super.connectionClosed();
this.wsSession = null;
}
@Override
public void close() {
if (!isClosed()) {
logger.debug("Closing session");
super.close();
this.wsSession.close();
this.wsSession = null;
}
public void closeInternal(CloseStatus status) throws IOException {
this.wsSession.close(status);
}
}

View File

@ -50,9 +50,9 @@ public class WebSocketTransportHandler implements ConfigurableTransportHandler,
private SockJsConfiguration sockJsConfig;
private final Map<SockJsHandler, WebSocketHandler> sockJsHandlers = new HashMap<SockJsHandler, WebSocketHandler>();
private final Map<SockJsHandler, WebSocketHandler> sockJsHandlerCache = new HashMap<SockJsHandler, WebSocketHandler>();
private final Collection<WebSocketHandler> rawWebSocketHandlers = new ArrayList<WebSocketHandler>();
private final Collection<WebSocketHandler> rawWebSocketHandlerCache = new ArrayList<WebSocketHandler>();
public WebSocketTransportHandler(HandshakeHandler handshakeHandler) {
@ -72,9 +72,9 @@ public class WebSocketTransportHandler implements ConfigurableTransportHandler,
@Override
public void registerSockJsHandlers(Collection<SockJsHandler> sockJsHandlers) {
this.sockJsHandlers.clear();
this.sockJsHandlerCache.clear();
for (SockJsHandler sockJsHandler : sockJsHandlers) {
this.sockJsHandlers.put(sockJsHandler, adaptSockJsHandler(sockJsHandler));
this.sockJsHandlerCache.put(sockJsHandler, adaptSockJsHandler(sockJsHandler));
}
this.handshakeHandler.registerWebSocketHandlers(getAllWebSocketHandlers());
}
@ -89,8 +89,8 @@ public class WebSocketTransportHandler implements ConfigurableTransportHandler,
private Collection<WebSocketHandler> getAllWebSocketHandlers() {
Set<WebSocketHandler> handlers = new HashSet<WebSocketHandler>();
handlers.addAll(this.sockJsHandlers.values());
handlers.addAll(this.rawWebSocketHandlers);
handlers.addAll(this.sockJsHandlerCache.values());
handlers.addAll(this.rawWebSocketHandlerCache);
return handlers;
}
@ -98,7 +98,7 @@ public class WebSocketTransportHandler implements ConfigurableTransportHandler,
public void handleRequest(ServerHttpRequest request, ServerHttpResponse response,
SockJsHandler sockJsHandler, SockJsSessionSupport session) throws Exception {
WebSocketHandler webSocketHandler = this.sockJsHandlers.get(sockJsHandler);
WebSocketHandler webSocketHandler = this.sockJsHandlerCache.get(sockJsHandler);
if (webSocketHandler == null) {
webSocketHandler = adaptSockJsHandler(sockJsHandler);
}
@ -110,8 +110,8 @@ public class WebSocketTransportHandler implements ConfigurableTransportHandler,
@Override
public void registerWebSocketHandlers(Collection<WebSocketHandler> webSocketHandlers) {
this.rawWebSocketHandlers.clear();
this.rawWebSocketHandlers.addAll(webSocketHandlers);
this.rawWebSocketHandlerCache.clear();
this.rawWebSocketHandlerCache.addAll(webSocketHandlers);
this.handshakeHandler.registerWebSocketHandlers(getAllWebSocketHandlers());
}

View File

@ -0,0 +1,192 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.websocket;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
/**
* Represents a WebSocket close status code and reason. Status codes in the 1xxx range are
* pre-defined by the protocol. Optionally, a status code may be sent with a reason.
* <p>
* See <a href="https://tools.ietf.org/html/rfc6455#section-7.4.1">RFC 6455, Section 7.4.1
* "Defined Status Codes"</a>.
*
* @author Rossen Stoyanchev
* @since 4.0
*
*/
public final class CloseStatus {
/**
* "1000 indicates a normal closure, meaning that the purpose for which the connection
* was established has been fulfilled."
*/
public static final CloseStatus NORMAL = new CloseStatus(1000);
/**
* "1001 indicates that an endpoint is "going away", such as a server going down or a
* browser having navigated away from a page."
*/
public static final CloseStatus GOING_AWAY = new CloseStatus(1001);
/**
* "1002 indicates that an endpoint is terminating the connection due to a protocol
* error."
*/
public static final CloseStatus PROTOCOL_ERROR = new CloseStatus(1002);
/**
* "1003 indicates that an endpoint is terminating the connection because it has
* received a type of data it cannot accept (e.g., an endpoint that understands only
* text data MAY send this if it receives a binary message)."
*/
public static final CloseStatus NOT_ACCEPTABLE = new CloseStatus(1003);
// 10004: Reserved.
// The specific meaning might be defined in the future.
/**
* "1005 is a reserved value and MUST NOT be set as a status code in a Close control
* frame by an endpoint. It is designated for use in applications expecting a status
* code to indicate that no status code was actually present."
*/
public static final CloseStatus NO_STATUS_CODE = new CloseStatus(1005);
/**
* "1006 is a reserved value and MUST NOT be set as a status code in a Close control
* frame by an endpoint. It is designated for use in applications expecting a status
* code to indicate that the connection was closed abnormally, e.g., without sending
* or receiving a Close control frame."
*/
public static final CloseStatus NO_CLOSE_FRAME = new CloseStatus(1006);
/**
* "1007 indicates that an endpoint is terminating the connection because it has
* received data within a message that was not consistent with the type of the message
* (e.g., non-UTF-8 [RFC3629] data within a text message)."
*/
public static final CloseStatus BAD_DATA = new CloseStatus(1007);
/**
* "1008 indicates that an endpoint is terminating the connection because it has
* received a message that violates its policy. This is a generic status code that can
* be returned when there is no other more suitable status code (e.g., 1003 or 1009)
* or if there is a need to hide specific details about the policy."
*/
public static final CloseStatus POLICY_VIOLATION = new CloseStatus(1008);
/**
* "1009 indicates that an endpoint is terminating the connection because it has
* received a message that is too big for it to process."
*/
public static final CloseStatus TOO_BIG_TO_PROCESS = new CloseStatus(1009);
/**
* "1010 indicates that an endpoint (client) is terminating the connection because it
* has expected the server to negotiate one or more extension, but the server didn't
* return them in the response message of the WebSocket handshake. The list of
* extensions that are needed SHOULD appear in the /reason/ part of the Close frame.
* Note that this status code is not used by the server, because it can fail the
* WebSocket handshake instead."
*/
public static final CloseStatus REQUIRED_EXTENSION = new CloseStatus(1010);
/**
* "1011 indicates that a server is terminating the connection because it encountered
* an unexpected condition that prevented it from fulfilling the request."
*/
public static final CloseStatus SERVER_ERROR = new CloseStatus(1011);
/**
* 1012 indicates that the service is restarted. A client may reconnect, and if it
* choses to do, should reconnect using a randomized delay of 5 - 30s.
* <p>See <a href="https://www.ietf.org/mail-archive/web/hybi/current/msg09649.html">
* [hybi] Additional WebSocket Close Error Codes</a>
*/
public static final CloseStatus SERVICE_RESTARTED = new CloseStatus(1012);
/**
* 1013 indicates that the service is experiencing overload. A client should only
* connect to a different IP (when there are multiple for the target) or reconnect to
* the same IP upon user action.
* <p>See <a href="https://www.ietf.org/mail-archive/web/hybi/current/msg09649.html">
* [hybi] Additional WebSocket Close Error Codes</a>
*/
public static final CloseStatus SERVICE_OVERLOAD = new CloseStatus(1013);
/**
* "1015 is a reserved value and MUST NOT be set as a status code in a Close control
* frame by an endpoint. It is designated for use in applications expecting a status
* code to indicate that the connection was closed due to a failure to perform a TLS
* handshake (e.g., the server certificate can't be verified)."
*/
public static final CloseStatus TLS_HANDSHAKE_FAILURE = new CloseStatus(1015);
private final int code;
private final String reason;
public CloseStatus(int code) {
this(code, null);
}
public CloseStatus(int code, String reason) {
Assert.isTrue((code >= 1000 && code < 5000), "Invalid code");
this.code = code;
this.reason = reason;
}
public int getCode() {
return this.code;
}
public String getReason() {
return this.reason;
}
public CloseStatus withReason(String reason) {
Assert.hasText(reason, "Expected non-empty reason");
return new CloseStatus(this.code, reason);
}
@Override
public int hashCode() {
return this.code * 29 + ObjectUtils.nullSafeHashCode(this.reason);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof CloseStatus)) {
return false;
}
CloseStatus otherStatus = (CloseStatus) other;
return (this.code == otherStatus.code && ObjectUtils.nullSafeEquals(this.reason, otherStatus.reason));
}
@Override
public String toString() {
return "CloseStatus [code=" + this.code + ", reason=" + this.reason + "]";
}
}

View File

@ -16,24 +16,39 @@
package org.springframework.websocket;
import java.io.InputStream;
/**
* A handler for WebSocket messages.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface WebSocketHandler {
void newSession(WebSocketSession session) throws Exception;
/**
* A new WebSocket connection has been opened and is ready to be used.
*/
void afterConnectionEstablished(WebSocketSession session) throws Exception;
void handleTextMessage(WebSocketSession session, String message) throws Exception;
/**
* Handle an incoming text message.
*/
void handleTextMessage(String message, WebSocketSession session) throws Exception;
void handleBinaryMessage(WebSocketSession session, InputStream message) throws Exception;
/**
* Handle an incoming binary message.
*/
void handleBinaryMessage(byte[] bytes, WebSocketSession session) throws Exception;
void handleException(WebSocketSession session, Throwable exception);
/**
* TODO
*/
void handleError(Throwable exception, WebSocketSession session);
void sessionClosed(WebSocketSession session, int statusCode, String reason) throws Exception;
/**
* A WebSocket connection has been closed.
*/
void afterConnectionClosed(CloseStatus closeStatus, WebSocketSession session) throws Exception;
}

View File

@ -16,7 +16,6 @@
package org.springframework.websocket;
import java.io.InputStream;
/**
*
@ -26,23 +25,23 @@ import java.io.InputStream;
public class WebSocketHandlerAdapter implements WebSocketHandler {
@Override
public void newSession(WebSocketSession session) throws Exception {
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
}
@Override
public void handleTextMessage(WebSocketSession session, String message) throws Exception {
public void handleTextMessage(String message, WebSocketSession session) throws Exception {
}
@Override
public void handleBinaryMessage(WebSocketSession session, InputStream message) throws Exception {
public void handleBinaryMessage(byte[] message, WebSocketSession session) throws Exception {
}
@Override
public void handleException(WebSocketSession session, Throwable exception) {
public void handleError(Throwable exception, WebSocketSession session) {
}
@Override
public void sessionClosed(WebSocketSession session, int statusCode, String reason) throws Exception {
public void afterConnectionClosed(CloseStatus status, WebSocketSession session) throws Exception {
}
}

View File

@ -17,24 +17,60 @@
package org.springframework.websocket;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
/**
* Allows sending messages over a WebSocket connection as well as closing it.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface WebSocketSession {
/**
* Return a unique session identifier.
*/
String getId();
/**
* Return whether the connection is still open.
*/
boolean isOpen();
void sendText(String text) throws IOException;
/**
* Return whether the underlying socket is using a secure transport.
*/
boolean isSecure();
void close();
/**
* Return the URI used to open the WebSocket connection.
*/
URI getURI();
void close(int code, String reason);
/**
* Send a text message.
*/
void sendTextMessage(String message) throws IOException;
/**
* Send a binary message.
*/
void sendBinaryMessage(ByteBuffer message) throws IOException;
/**
* Close the WebSocket connection with status 1000, i.e. equivalent to:
* <pre>
* session.close(CloseStatus.NORMAL);
* </pre>
*/
void close() throws IOException;
/**
* Close the WebSocket connection with the given close status.
*/
void close(CloseStatus status) throws IOException;
}

View File

@ -17,14 +17,22 @@
package org.springframework.websocket.endpoint;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import javax.websocket.CloseReason;
import javax.websocket.CloseReason.CloseCodes;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.util.Assert;
import org.springframework.websocket.CloseStatus;
import org.springframework.websocket.WebSocketSession;
/**
* A {@link WebSocketSession} that delegates to a {@link javax.websocket.Session}.
* A standard Java implementation of {@link WebSocketSession} that delegates to
* {@link javax.websocket.Session}.
*
* @author Rossen Stoyanchev
* @since 4.0
@ -33,10 +41,11 @@ public class StandardWebSocketSession implements WebSocketSession {
private static Log logger = LogFactory.getLog(StandardWebSocketSession.class);
private javax.websocket.Session session;
private final javax.websocket.Session session;
public StandardWebSocketSession(javax.websocket.Session session) {
Assert.notNull(session, "session is required");
this.session = session;
}
@ -47,25 +56,53 @@ public class StandardWebSocketSession implements WebSocketSession {
@Override
public boolean isOpen() {
return ((this.session != null) && this.session.isOpen());
return this.session.isOpen();
}
@Override
public void sendText(String text) throws IOException {
logger.trace("Sending text message: " + text);
// TODO: check closed
public boolean isSecure() {
return this.session.isSecure();
}
@Override
public URI getURI() {
return this.session.getRequestURI();
}
@Override
public void sendTextMessage(String text) throws IOException {
if (logger.isTraceEnabled()) {
logger.trace("Sending text message: " + text + ", " + this);
}
Assert.isTrue(isOpen(), "Cannot send message after connection closed.");
this.session.getBasicRemote().sendText(text);
}
@Override
public void close() {
// TODO: delegate with code and reason
this.session = null;
public void sendBinaryMessage(ByteBuffer message) throws IOException {
if (logger.isTraceEnabled()) {
logger.trace("Sending binary message, " + this);
}
Assert.isTrue(isOpen(), "Cannot send message after connection closed.");
this.session.getBasicRemote().sendBinary(message);
}
@Override
public void close(int code, String reason) {
this.session = null;
public void close() throws IOException {
close(CloseStatus.NORMAL);
}
@Override
public void close(CloseStatus status) throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("Closing " + this);
}
this.session.close(new CloseReason(CloseCodes.getCloseCode(status.getCode()), status.getReason()));
}
@Override
public String toString() {
return "WebSocket session id=" + getId();
}
}

View File

@ -27,6 +27,7 @@ import javax.websocket.MessageHandler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.util.Assert;
import org.springframework.websocket.CloseStatus;
import org.springframework.websocket.WebSocketSession;
import org.springframework.websocket.WebSocketHandler;
@ -53,13 +54,14 @@ public class WebSocketHandlerEndpoint extends Endpoint {
@Override
public void onOpen(javax.websocket.Session session, EndpointConfig config) {
if (logger.isDebugEnabled()) {
logger.debug("New session: " + session);
logger.debug("Client connected, WebSocket session id=" + session.getId()
+ ", path=" + session.getRequestURI().getPath());
}
try {
WebSocketSession webSocketSession = new StandardWebSocketSession(session);
this.sessions.put(session.getId(), webSocketSession);
session.addMessageHandler(new StandardMessageHandler(session));
this.webSocketHandler.newSession(webSocketSession);
this.webSocketHandler.afterConnectionEstablished(webSocketSession);
}
catch (Throwable ex) {
// TODO
@ -68,16 +70,15 @@ public class WebSocketHandlerEndpoint extends Endpoint {
}
@Override
public void onClose(javax.websocket.Session session, CloseReason closeReason) {
public void onClose(javax.websocket.Session session, CloseReason reason) {
if (logger.isDebugEnabled()) {
logger.debug("Session closed: " + session + ", " + closeReason);
logger.debug("Client disconnected, WebSocket session id=" + session.getId() + ", " + reason);
}
try {
WebSocketSession wsSession = this.sessions.remove(session.getId());
if (wsSession != null) {
int code = closeReason.getCloseCode().getCode();
String reason = closeReason.getReasonPhrase();
this.webSocketHandler.sessionClosed(wsSession, code, reason);
CloseStatus closeStatus = new CloseStatus(reason.getCloseCode().getCode(), reason.getReasonPhrase());
this.webSocketHandler.afterConnectionClosed(closeStatus, wsSession);
}
else {
Assert.notNull(wsSession, "No WebSocket session");
@ -91,11 +92,11 @@ public class WebSocketHandlerEndpoint extends Endpoint {
@Override
public void onError(javax.websocket.Session session, Throwable exception) {
logger.error("Error for WebSocket session: " + session.getId(), exception);
logger.error("Error for WebSocket session id=" + session.getId(), exception);
try {
WebSocketSession wsSession = getWebSocketSession(session);
if (wsSession != null) {
this.webSocketHandler.handleException(wsSession, exception);
this.webSocketHandler.handleError(exception, wsSession);
}
else {
logger.warn("WebSocketSession not found. Perhaps onError was called after onClose?");
@ -123,12 +124,12 @@ public class WebSocketHandlerEndpoint extends Endpoint {
@Override
public void onMessage(String message) {
if (logger.isTraceEnabled()) {
logger.trace("Message for session [" + this.session + "]: " + message);
logger.trace("Received message for WebSocket session id=" + this.session.getId() + ": " + message);
}
WebSocketSession wsSession = getWebSocketSession(this.session);
Assert.notNull(wsSession, "WebSocketSession not found");
try {
WebSocketHandlerEndpoint.this.webSocketHandler.handleTextMessage(wsSession, message);
WebSocketHandlerEndpoint.this.webSocketHandler.handleTextMessage(message, wsSession);
}
catch (Throwable ex) {
// TODO

View File

@ -75,6 +75,6 @@ public abstract class AbstractEndpointUpgradeStrategy implements RequestUpgradeS
}
protected abstract void upgradeInternal(ServerHttpRequest request, ServerHttpResponse response,
String protocol, Endpoint endpoint) throws Exception;
String selectedProtocol, Endpoint endpoint) throws Exception;
}

View File

@ -18,6 +18,7 @@ package org.springframework.websocket.server.support;
import java.lang.reflect.Constructor;
import java.net.URI;
import java.util.Arrays;
import java.util.Random;
import javax.servlet.http.HttpServletRequest;
@ -66,7 +67,7 @@ public class GlassfishRequestUpgradeStrategy extends AbstractEndpointUpgradeStra
@Override
public void upgradeInternal(ServerHttpRequest request, ServerHttpResponse response,
String protocol, Endpoint endpoint) throws Exception {
String selectedProtocol, Endpoint endpoint) throws Exception {
Assert.isTrue(request instanceof ServletServerHttpRequest);
HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
@ -75,7 +76,7 @@ public class GlassfishRequestUpgradeStrategy extends AbstractEndpointUpgradeStra
HttpServletResponse servletResponse = ((ServletServerHttpResponse) response).getServletResponse();
servletResponse = new AlreadyUpgradedResponseWrapper(servletResponse);
TyrusEndpoint tyrusEndpoint = createTyrusEndpoint(servletRequest, endpoint);
TyrusEndpoint tyrusEndpoint = createTyrusEndpoint(servletRequest, endpoint, selectedProtocol);
WebSocketEngine engine = WebSocketEngine.getEngine();
engine.register(tyrusEndpoint);
@ -112,7 +113,7 @@ public class GlassfishRequestUpgradeStrategy extends AbstractEndpointUpgradeStra
});
}
private TyrusEndpoint createTyrusEndpoint(HttpServletRequest request, Endpoint endpoint) {
private TyrusEndpoint createTyrusEndpoint(HttpServletRequest request, Endpoint endpoint, String selectedProtocol) {
// Use randomized path
String requestUri = request.getRequestURI();
@ -120,6 +121,7 @@ public class GlassfishRequestUpgradeStrategy extends AbstractEndpointUpgradeStra
String endpointPath = requestUri.endsWith("/") ? requestUri + randomValue : requestUri + "/" + randomValue;
EndpointRegistration endpointConfig = new EndpointRegistration(endpointPath, endpoint);
endpointConfig.setSubprotocols(Arrays.asList(selectedProtocol));
return new TyrusEndpoint(new EndpointWrapper(endpoint, endpointConfig,
ComponentProviderService.create(), null, "/", new ErrorCollector(),

View File

@ -51,7 +51,7 @@ public class TomcatRequestUpgradeStrategy extends AbstractEndpointUpgradeStrateg
@Override
public void upgradeInternal(ServerHttpRequest request, ServerHttpResponse response,
String protocol, Endpoint endpoint) throws IOException {
String selectedProtocol, Endpoint endpoint) throws IOException {
Assert.isTrue(request instanceof ServletServerHttpRequest);
HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
@ -74,7 +74,7 @@ public class TomcatRequestUpgradeStrategy extends AbstractEndpointUpgradeStrateg
ServerEndpointConfig endpointConfig = new EndpointRegistration("/shouldntmatter", endpoint);
upgradeHandler.preInit(endpoint, endpointConfig, serverContainer, webSocketRequest,
protocol, Collections.<String, String> emptyMap(), servletRequest.isSecure());
selectedProtocol, Collections.<String, String> emptyMap(), servletRequest.isSecure());
}
}