Add onClosed to SockJsSessionSupport sub-classes

As opposed to close(), which actively closes the session, the
onClosed method is called when the underlying connection has been
closed or disconnected.
This commit is contained in:
Rossen Stoyanchev 2013-04-17 11:19:28 -04:00
parent f056f7e2ad
commit 2794224b28
14 changed files with 253 additions and 182 deletions

View File

@ -27,6 +27,8 @@ import java.io.IOException;
*/
public interface SockJsSession {
String getId();
void sendMessage(String text) throws IOException;
void close();

View File

@ -114,6 +114,11 @@ public abstract class SockJsSessionSupport implements SockJsSession {
this.sockJsHandler.handleException(this, ex);
}
public void connectionClosed() {
this.state = State.CLOSED;
this.sockJsHandler.sessionClosed(this);
}
public void close() {
this.state = State.CLOSED;
this.sockJsHandler.sessionClosed(this);

View File

@ -58,10 +58,18 @@ public abstract class AbstractServerSession extends SockJsSessionSupport {
protected abstract void sendMessageInternal(String message) throws IOException;
@Override
public void connectionClosed() {
logger.debug("Session closed");
super.close();
cancelHeartbeat();
}
@Override
public final synchronized void close() {
if (!isClosed()) {
logger.debug("Closing session");
if (isActive()) {
// deliver messages "in flight" before sending close frame
try {
@ -71,9 +79,7 @@ public abstract class AbstractServerSession extends SockJsSessionSupport {
// ignore
}
}
super.close();
cancelHeartbeat();
closeInternal();
}

View File

@ -40,7 +40,6 @@ import org.springframework.util.CollectionUtils;
import org.springframework.util.DigestUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.util.UriUtils;
/**
@ -57,7 +56,7 @@ public abstract class AbstractSockJsService
private static final int ONE_YEAR = 365 * 24 * 60 * 60;
private final String prefix;
private String name = getClass().getSimpleName() + "@" + Integer.toHexString(hashCode());
private String clientLibraryUrl = "https://d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js";
@ -74,31 +73,25 @@ public abstract class AbstractSockJsService
private final TaskSchedulerHolder heartbeatSchedulerHolder;
/**
* Class constructor...
*
* @param prefix the path prefix for the SockJS service. All requests with a path
* that begins with the specified prefix will be handled by this service. In a
* Servlet container this is the path within the current servlet mapping.
*/
public AbstractSockJsService(String prefix) {
Assert.hasText(prefix, "prefix is required");
this.prefix = prefix;
public AbstractSockJsService() {
this.heartbeatSchedulerHolder = new TaskSchedulerHolder("SockJs-heartbeat-");
}
public AbstractSockJsService(String prefix, TaskScheduler heartbeatScheduler) {
Assert.hasText(prefix, "prefix is required");
public AbstractSockJsService(TaskScheduler heartbeatScheduler) {
Assert.notNull(heartbeatScheduler, "heartbeatScheduler is required");
this.prefix = prefix;
this.heartbeatSchedulerHolder = new TaskSchedulerHolder(heartbeatScheduler);
}
/**
* The path prefix to which the SockJS service is mapped.
* A unique name for the service mainly for logging purposes.
*/
public String getPrefix() {
return this.prefix;
public void setName(String name) {
this.name = name;
}
public String getName() {
return this.name;
}
/**
@ -236,10 +229,6 @@ public abstract class AbstractSockJsService
// Ignore invalid Content-Type (TODO)
}
String path = UriUtils.decode(request.getURI().getPath(), "URF-8");
int index = path.indexOf(this.prefix);
sockJsPath = path.substring(index + this.prefix.length());
try {
if (sockJsPath.equals("") || sockJsPath.equals("/")) {
response.getHeaders().setContentType(new MediaType("text", "plain", Charset.forName("UTF-8")));

View File

@ -31,8 +31,6 @@ import org.springframework.websocket.WebSocketHandler;
*/
public interface SockJsService {
String getPrefix();
/**
* Pre-register {@link SockJsHandler} instances so they can be adapted to
* {@link WebSocketHandler} and hence re-used at runtime when

View File

@ -70,13 +70,11 @@ public class DefaultSockJsService extends AbstractSockJsService implements Initi
private final Map<SockJsHandler, WebSocketHandler> sockJsHandlers = new HashMap<SockJsHandler, WebSocketHandler>();
public DefaultSockJsService(String prefix) {
super(prefix);
public DefaultSockJsService() {
this.sessionTimeoutSchedulerHolder = new TaskSchedulerHolder("SockJs-sessionTimeout-");
}
public DefaultSockJsService(String prefix, TaskScheduler heartbeatScheduler, TaskScheduler sessionTimeoutScheduler) {
super(prefix, heartbeatScheduler);
public DefaultSockJsService(TaskScheduler heartbeatScheduler, TaskScheduler sessionTimeoutScheduler) {
Assert.notNull(sessionTimeoutScheduler, "sessionTimeoutScheduler is required");
this.sessionTimeoutSchedulerHolder = new TaskSchedulerHolder(sessionTimeoutScheduler);
}
@ -146,23 +144,23 @@ public class DefaultSockJsService extends AbstractSockJsService implements Initi
try {
int count = sessions.size();
if (logger.isTraceEnabled() && (count != 0)) {
logger.trace("Checking " + count + " session(s) for timeouts [" + getPrefix() + "]");
logger.trace("Checking " + count + " session(s) for timeouts [" + getName() + "]");
}
for (SockJsSessionSupport session : sessions.values()) {
if (session.getTimeSinceLastActive() > getDisconnectDelay()) {
if (logger.isTraceEnabled()) {
logger.trace("Removing " + session + " for [" + getPrefix() + "]");
logger.trace("Removing " + session + " for [" + getName() + "]");
}
session.close();
sessions.remove(session.getId());
}
}
if (logger.isTraceEnabled() && (count != 0)) {
logger.trace(sessions.size() + " remaining session(s) [" + getPrefix() + "]");
logger.trace(sessions.size() + " remaining session(s) [" + getName() + "]");
}
}
catch (Throwable t) {
logger.error("Failed to complete session timeout checks for [" + getPrefix() + "]", t);
logger.error("Failed to complete session timeout checks for [" + getName() + "]", t);
}
}
}, getDisconnectDelay());

View File

@ -46,6 +46,8 @@ import org.springframework.websocket.HandlerProvider;
*/
public class SockJsHttpRequestHandler implements HttpRequestHandler, BeanFactoryAware {
private final String prefix;
private final SockJsService sockJsService;
private final HandlerProvider<SockJsHandler> handlerProvider;
@ -53,23 +55,50 @@ public class SockJsHttpRequestHandler implements HttpRequestHandler, BeanFactory
private final UrlPathHelper urlPathHelper = new UrlPathHelper();
public SockJsHttpRequestHandler(SockJsService sockJsService, SockJsHandler sockJsHandler) {
/**
* Class constructor with {@link SockJsHandler} instance ...
*
* @param prefix the path prefix for the SockJS service. All requests with a path
* that begins with the specified prefix will be handled by this service. In a
* Servlet container this is the path within the current servlet mapping.
*/
public SockJsHttpRequestHandler(String prefix, SockJsService sockJsService, SockJsHandler sockJsHandler) {
Assert.hasText(prefix, "prefix is required");
Assert.notNull(sockJsService, "sockJsService is required");
Assert.notNull(sockJsHandler, "sockJsHandler is required");
this.prefix = prefix;
this.sockJsService = sockJsService;
this.sockJsService.registerSockJsHandlers(Collections.singleton(sockJsHandler));
this.handlerProvider = new HandlerProvider<SockJsHandler>(sockJsHandler);
}
public SockJsHttpRequestHandler(SockJsService sockJsService, Class<? extends SockJsHandler> sockJsHandlerClass) {
/**
* Class constructor with {@link SockJsHandler} type (per request) ...
*
* @param prefix the path prefix for the SockJS service. All requests with a path
* that begins with the specified prefix will be handled by this service. In a
* Servlet container this is the path within the current servlet mapping.
*/
public SockJsHttpRequestHandler(String prefix, SockJsService sockJsService,
Class<? extends SockJsHandler> sockJsHandlerClass) {
Assert.hasText(prefix, "prefix is required");
Assert.notNull(sockJsService, "sockJsService is required");
Assert.notNull(sockJsHandlerClass, "sockJsHandlerClass is required");
this.prefix = prefix;
this.sockJsService = sockJsService;
this.handlerProvider = new HandlerProvider<SockJsHandler>(sockJsHandlerClass);
}
public String getMappingPattern() {
return this.sockJsService.getPrefix() + "/**";
public String getPrefix() {
return this.prefix;
}
public String getPattern() {
return this.prefix + "/**";
}
@Override
@ -82,10 +111,9 @@ public class SockJsHttpRequestHandler implements HttpRequestHandler, BeanFactory
throws ServletException, IOException {
String lookupPath = this.urlPathHelper.getLookupPathForRequest(request);
String prefix = this.sockJsService.getPrefix();
Assert.isTrue(lookupPath.startsWith(prefix),
"Request path does not match the prefix of the SockJsService " + prefix);
Assert.isTrue(lookupPath.startsWith(this.prefix),
"Request path does not match the prefix of the SockJsService " + this.prefix);
String sockJsPath = lookupPath.substring(prefix.length());

View File

@ -108,6 +108,13 @@ public abstract class AbstractHttpServerSession extends AbstractServerSession {
*/
protected abstract void flushCache() throws IOException;
@Override
public void connectionClosed() {
super.connectionClosed();
resetRequest();
}
@Override
protected void closeInternal() {
resetRequest();
}

View File

@ -1,108 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server.transport;
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.sockjs.SockJsHandler;
import org.springframework.sockjs.SockJsSessionSupport;
import org.springframework.sockjs.server.SockJsConfiguration;
import org.springframework.util.Assert;
import org.springframework.websocket.WebSocketHandler;
import org.springframework.websocket.WebSocketSession;
/**
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public abstract class AbstractSockJsWebSocketHandler implements WebSocketHandler {
protected final Log logger = LogFactory.getLog(getClass());
private final SockJsConfiguration sockJsConfig;
private final SockJsHandler sockJsHandler;
private final Map<WebSocketSession, SockJsSessionSupport> sessions =
new ConcurrentHashMap<WebSocketSession, SockJsSessionSupport>();
public AbstractSockJsWebSocketHandler(SockJsConfiguration sockJsConfig, SockJsHandler sockJsHandler) {
Assert.notNull(sockJsConfig, "sockJsConfig is required");
Assert.notNull(sockJsHandler, "sockJsHandler is required");
this.sockJsConfig = sockJsConfig;
this.sockJsHandler = sockJsHandler;
}
protected SockJsConfiguration getSockJsConfig() {
return this.sockJsConfig;
}
protected SockJsHandler getSockJsHandler() {
return this.sockJsHandler;
}
protected SockJsSessionSupport getSockJsSession(WebSocketSession wsSession) {
return this.sessions.get(wsSession);
}
@Override
public void newSession(WebSocketSession wsSession) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("New session: " + wsSession);
}
SockJsSessionSupport session = createSockJsSession(wsSession);
this.sessions.put(wsSession, session);
}
protected abstract SockJsSessionSupport createSockJsSession(WebSocketSession wsSession) throws Exception;
@Override
public void handleTextMessage(WebSocketSession wsSession, String message) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("Received payload " + message);
}
SockJsSessionSupport session = getSockJsSession(wsSession);
session.delegateMessages(message);
}
@Override
public void handleBinaryMessage(WebSocketSession session, InputStream message) throws Exception {
// should not happen
throw new UnsupportedOperationException();
}
@Override
public void handleException(WebSocketSession webSocketSession, Throwable exception) {
SockJsSessionSupport session = getSockJsSession(webSocketSession);
session.delegateException(exception);
}
@Override
public void sessionClosed(WebSocketSession webSocketSession, int statusCode, String reason) throws Exception {
logger.debug("WebSocket connection closed " + webSocketSession);
SockJsSessionSupport session = this.sessions.remove(webSocketSession);
session.close();
}
}

View File

@ -17,12 +17,18 @@
package org.springframework.sockjs.server.transport;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.sockjs.SockJsHandler;
import org.springframework.sockjs.SockJsSessionSupport;
import org.springframework.sockjs.server.AbstractServerSession;
import org.springframework.sockjs.server.SockJsConfiguration;
import org.springframework.sockjs.server.SockJsFrame;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.websocket.WebSocketHandler;
import org.springframework.websocket.WebSocketSession;
@ -37,19 +43,47 @@ import com.fasterxml.jackson.databind.ObjectMapper;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class SockJsWebSocketHandler extends AbstractSockJsWebSocketHandler {
public class SockJsWebSocketHandler implements WebSocketHandler {
private static final Log logger = LogFactory.getLog(SockJsWebSocketHandler.class);
private final SockJsConfiguration sockJsConfig;
private final SockJsHandler sockJsHandler;
private final Map<WebSocketSession, SockJsSessionSupport> sessions =
new ConcurrentHashMap<WebSocketSession, SockJsSessionSupport>();
// TODO: JSON library used must be configurable
private final ObjectMapper objectMapper = new ObjectMapper();
public SockJsWebSocketHandler(SockJsConfiguration sockJsConfig, SockJsHandler sockJsHandler) {
super(sockJsConfig, sockJsHandler);
Assert.notNull(sockJsConfig, "sockJsConfig is required");
Assert.notNull(sockJsHandler, "sockJsHandler is required");
this.sockJsConfig = sockJsConfig;
this.sockJsHandler = sockJsHandler;
}
protected SockJsConfiguration getSockJsConfig() {
return this.sockJsConfig;
}
protected SockJsHandler getSockJsHandler() {
return this.sockJsHandler;
}
protected SockJsSessionSupport getSockJsSession(WebSocketSession wsSession) {
return this.sessions.get(wsSession);
}
@Override
protected SockJsSessionSupport createSockJsSession(WebSocketSession wsSession) throws Exception {
return new WebSocketServerSession(wsSession, getSockJsConfig());
public void newSession(WebSocketSession wsSession) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("New session: " + wsSession);
}
SockJsSessionSupport session = new WebSocketServerSession(wsSession, getSockJsConfig());
this.sessions.put(wsSession, session);
}
@Override
@ -72,6 +106,25 @@ public class SockJsWebSocketHandler extends AbstractSockJsWebSocketHandler {
}
}
@Override
public void handleBinaryMessage(WebSocketSession session, InputStream message) throws Exception {
// should not happen
throw new UnsupportedOperationException();
}
@Override
public void handleException(WebSocketSession webSocketSession, Throwable exception) {
SockJsSessionSupport session = getSockJsSession(webSocketSession);
session.delegateException(exception);
}
@Override
public void sessionClosed(WebSocketSession webSocketSession, int statusCode, String reason) throws Exception {
logger.debug("WebSocket session closed " + webSocketSession);
SockJsSessionSupport session = this.sessions.remove(webSocketSession);
session.connectionClosed();
}
private class WebSocketServerSession extends AbstractServerSession {
@ -107,15 +160,23 @@ public class SockJsWebSocketHandler extends AbstractSockJsWebSocketHandler {
}
@Override
public void closeInternal() {
this.webSocketSession.close();
public void connectionClosed() {
super.connectionClosed();
this.webSocketSession = null;
}
@Override
public void closeInternal() {
deactivate();
updateLastActiveTime();
}
@Override
protected void deactivate() {
this.webSocketSession.close();
if (this.webSocketSession != null) {
this.webSocketSession.close();
this.webSocketSession = null;
}
}
}

View File

@ -17,10 +17,16 @@
package org.springframework.sockjs.server.transport;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.sockjs.SockJsHandler;
import org.springframework.sockjs.SockJsSessionSupport;
import org.springframework.sockjs.server.SockJsConfiguration;
import org.springframework.util.Assert;
import org.springframework.websocket.WebSocketHandler;
import org.springframework.websocket.WebSocketSession;
@ -33,22 +39,78 @@ import org.springframework.websocket.WebSocketSession;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class WebSocketSockJsHandlerAdapter extends AbstractSockJsWebSocketHandler {
public class WebSocketSockJsHandlerAdapter implements WebSocketHandler {
private static final Log logger = LogFactory.getLog(WebSocketSockJsHandlerAdapter.class);
private final SockJsConfiguration sockJsConfig;
private final SockJsHandler sockJsHandler;
private final Map<WebSocketSession, SockJsSessionSupport> sessions =
new ConcurrentHashMap<WebSocketSession, SockJsSessionSupport>();
public WebSocketSockJsHandlerAdapter(SockJsConfiguration sockJsConfig, SockJsHandler sockJsHandler) {
super(sockJsConfig, sockJsHandler);
Assert.notNull(sockJsConfig, "sockJsConfig is required");
Assert.notNull(sockJsHandler, "sockJsHandler is required");
this.sockJsConfig = sockJsConfig;
this.sockJsHandler = sockJsHandler;
}
protected SockJsConfiguration getSockJsConfig() {
return this.sockJsConfig;
}
protected SockJsHandler getSockJsHandler() {
return this.sockJsHandler;
}
protected SockJsSessionSupport getSockJsSession(WebSocketSession wsSession) {
return this.sessions.get(wsSession);
}
@Override
protected SockJsSessionSupport createSockJsSession(WebSocketSession wsSession) throws Exception {
return new WebSocketSessionAdapter(wsSession);
public void newSession(WebSocketSession wsSession) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("New session: " + wsSession);
}
SockJsSessionSupport session = new WebSocketSessionAdapter(wsSession);
this.sessions.put(wsSession, session);
}
@Override
public void handleTextMessage(WebSocketSession wsSession, String message) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("Received payload " + message);
}
SockJsSessionSupport session = getSockJsSession(wsSession);
session.delegateMessages(message);
}
@Override
public void handleBinaryMessage(WebSocketSession session, InputStream message) throws Exception {
// should not happen
throw new UnsupportedOperationException();
}
@Override
public void handleException(WebSocketSession webSocketSession, Throwable exception) {
SockJsSessionSupport session = getSockJsSession(webSocketSession);
session.delegateException(exception);
}
@Override
public void sessionClosed(WebSocketSession webSocketSession, int statusCode, String reason) throws Exception {
logger.debug("WebSocket session closed " + webSocketSession);
SockJsSessionSupport session = this.sessions.remove(webSocketSession);
session.connectionClosed();
}
private class WebSocketSessionAdapter extends SockJsSessionSupport {
private final WebSocketSession wsSession;
private WebSocketSession wsSession;
public WebSocketSessionAdapter(WebSocketSession wsSession) throws Exception {
@ -67,11 +129,20 @@ public class WebSocketSockJsHandlerAdapter extends AbstractSockJsWebSocketHandle
this.wsSession.sendText(message);
}
@Override
public void connectionClosed() {
logger.debug("Session closed");
super.connectionClosed();
this.wsSession = null;
}
@Override
public void close() {
if (!isClosed()) {
logger.debug("Closing session");
super.close();
this.wsSession.close();
this.wsSession = null;
}
}
}

View File

@ -27,6 +27,8 @@ import java.io.IOException;
*/
public interface WebSocketSession {
String getId();
boolean isOpen();
void sendText(String text) throws IOException;

View File

@ -40,6 +40,11 @@ public class StandardWebSocketSession implements WebSocketSession {
this.session = session;
}
@Override
public String getId() {
return this.session.getId();
}
@Override
public boolean isOpen() {
return ((this.session != null) && this.session.isOpen());

View File

@ -58,7 +58,7 @@ public class WebSocketHandlerEndpoint extends Endpoint {
try {
WebSocketSession webSocketSession = new StandardWebSocketSession(session);
this.sessions.put(session.getId(), webSocketSession);
session.addMessageHandler(new StandardMessageHandler(session.getId()));
session.addMessageHandler(new StandardMessageHandler(session));
this.webSocketHandler.newSession(webSocketSession);
}
catch (Throwable ex) {
@ -69,16 +69,19 @@ public class WebSocketHandlerEndpoint extends Endpoint {
@Override
public void onClose(javax.websocket.Session session, CloseReason closeReason) {
String id = session.getId();
if (logger.isDebugEnabled()) {
logger.debug("Closing session: " + session + ", " + closeReason);
logger.debug("Session closed: " + session + ", " + closeReason);
}
try {
WebSocketSession webSocketSession = getSession(id);
this.sessions.remove(id);
int code = closeReason.getCloseCode().getCode();
String reason = closeReason.getReasonPhrase();
this.webSocketHandler.sessionClosed(webSocketSession, code, reason);
WebSocketSession wsSession = this.sessions.remove(session.getId());
if (wsSession != null) {
int code = closeReason.getCloseCode().getCode();
String reason = closeReason.getReasonPhrase();
this.webSocketHandler.sessionClosed(wsSession, code, reason);
}
else {
Assert.notNull(wsSession, "No WebSocket session");
}
}
catch (Throwable ex) {
// TODO
@ -90,8 +93,13 @@ public class WebSocketHandlerEndpoint extends Endpoint {
public void onError(javax.websocket.Session session, Throwable exception) {
logger.error("Error for WebSocket session: " + session.getId(), exception);
try {
WebSocketSession webSocketSession = getSession(session.getId());
this.webSocketHandler.handleException(webSocketSession, exception);
WebSocketSession wsSession = getWebSocketSession(session);
if (wsSession != null) {
this.webSocketHandler.handleException(wsSession, exception);
}
else {
logger.warn("WebSocketSession not found. Perhaps onError was called after onClose?");
}
}
catch (Throwable ex) {
// TODO
@ -99,29 +107,28 @@ public class WebSocketHandlerEndpoint extends Endpoint {
}
}
private WebSocketSession getSession(String sourceSessionId) {
WebSocketSession webSocketSession = this.sessions.get(sourceSessionId);
Assert.notNull(webSocketSession, "No session");
return webSocketSession;
private WebSocketSession getWebSocketSession(javax.websocket.Session session) {
return this.sessions.get(session.getId());
}
private class StandardMessageHandler implements MessageHandler.Whole<String> {
private final String sessionId;
private final javax.websocket.Session session;
public StandardMessageHandler(String sessionId) {
this.sessionId = sessionId;
public StandardMessageHandler(javax.websocket.Session session) {
this.session = session;
}
@Override
public void onMessage(String message) {
if (logger.isTraceEnabled()) {
logger.trace("Message for session [" + this.sessionId + "]: " + message);
logger.trace("Message for session [" + this.session + "]: " + message);
}
WebSocketSession wsSession = getWebSocketSession(this.session);
Assert.notNull(wsSession, "WebSocketSession not found");
try {
WebSocketSession session = getSession(this.sessionId);
WebSocketHandlerEndpoint.this.webSocketHandler.handleTextMessage(session, message);
WebSocketHandlerEndpoint.this.webSocketHandler.handleTextMessage(wsSession, message);
}
catch (Throwable ex) {
// TODO