diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserSessionRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserSessionRegistry.java index 4074d6b9f8..f23da3e530 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserSessionRegistry.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserSessionRegistry.java @@ -14,6 +14,8 @@ import java.util.Set; */ public interface UserSessionRegistry { + + /** * Return the active session id's for the given user. * @param user the user diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java index de1e148f28..3e1d939ed5 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java @@ -168,9 +168,9 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { } /** - * Whether this HTTP transport streams message frames vs closing the response - * after each frame written (long polling). + * @deprecated as of 4.2 this method is no longer used for anything */ + @Deprecated protected abstract boolean isStreaming(); @@ -205,15 +205,10 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { // Let "our" handler know before sending the open frame to the remote handler delegateConnectionEstablished(); - if (isStreaming()) { - writePrelude(request, response); - writeFrame(SockJsFrame.openFrame()); - flushCache(); - this.readyToSend = true; - } - else { - writeFrame(SockJsFrame.openFrame()); - } + handleRequestInternal(request, response, true); + + // Request might have been reset (e.g. polling sessions do after writing) + this.readyToSend = isActive(); } catch (Throwable ex) { tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR); @@ -222,9 +217,6 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { } } - protected void writePrelude(ServerHttpRequest request, ServerHttpResponse response) throws IOException { - } - /** * Handle all requests, except the first one, to receive messages on a SockJS * HTTP transport based session. @@ -251,20 +243,8 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { this.asyncRequestControl = request.getAsyncRequestControl(response); this.asyncRequestControl.start(-1); - if (isStreaming()) { - writePrelude(request, response); - flushCache(); - this.readyToSend = true; - } - else { - if (this.messageCache.isEmpty()) { - scheduleHeartbeat(); - this.readyToSend = true; - } - else { - flushCache(); - } - } + handleRequestInternal(request, response, false); + this.readyToSend = isActive(); } catch (Throwable ex) { tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR); @@ -273,6 +253,14 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { } } + /** + * Invoked when a SockJS transport request is received. + * @param request the current request + * @param response the current response + * @param initialRequest whether it is the first request for the session + */ + protected abstract void handleRequestInternal(ServerHttpRequest request, ServerHttpResponse response, + boolean initialRequest) throws IOException; @Override protected final void sendMessageInternal(String message) throws SockJsTransportFailureException { @@ -287,23 +275,26 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { } cancelHeartbeat(); flushCache(); - return; } else { if (logger.isTraceEnabled()) { logger.trace("Session is not active, not ready to flush."); } - return; } } } /** * Called when the connection is active and ready to write to the response. - * Subclasses should implement but never call this method directly. + * Subclasses should only call this method from a method where the + * "responseLock" is acquired. */ protected abstract void flushCache() throws SockJsTransportFailureException; + + protected void writePrelude(ServerHttpRequest request, ServerHttpResponse response) throws IOException { + } + @Override protected void disconnect(CloseStatus status) { resetRequest(); @@ -341,12 +332,7 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { logger.trace("Writing to HTTP response: " + formattedFrame); } this.response.getBody().write(formattedFrame.getBytes(SockJsFrame.CHARSET)); - if (isStreaming()) { - this.response.flush(); - } - else { - resetRequest(); - } + this.response.flush(); } } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/PollingSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/PollingSockJsSession.java index 48b9b7533b..0ece43cf68 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/PollingSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/PollingSockJsSession.java @@ -16,8 +16,11 @@ package org.springframework.web.socket.sockjs.transport.session; +import java.io.IOException; import java.util.Map; +import org.springframework.http.server.ServerHttpRequest; +import org.springframework.http.server.ServerHttpResponse; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.sockjs.SockJsTransportFailureException; import org.springframework.web.socket.sockjs.frame.SockJsFrame; @@ -40,11 +43,31 @@ public class PollingSockJsSession extends AbstractHttpSockJsSession { } + /** + * @deprecated as of 4.2 this method is no longer used for anything + */ @Override + @Deprecated protected boolean isStreaming() { return false; } + @Override + protected void handleRequestInternal(ServerHttpRequest request, ServerHttpResponse response, + boolean initialRequest) throws IOException { + + if (initialRequest) { + writeFrame(SockJsFrame.openFrame()); + resetRequest(); + } + else if (!getMessageCache().isEmpty()) { + flushCache(); + } + else { + scheduleHeartbeat(); + } + } + @Override protected void flushCache() throws SockJsTransportFailureException { String[] messages = new String[getMessageCache().size()]; @@ -54,6 +77,7 @@ public class PollingSockJsSession extends AbstractHttpSockJsSession { SockJsMessageCodec messageCodec = getSockJsServiceConfig().getMessageCodec(); SockJsFrame frame = SockJsFrame.messageFrame(messageCodec, messages); writeFrame(frame); + resetRequest(); } } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/StreamingSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/StreamingSockJsSession.java index 06caafe088..a63f1898b3 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/StreamingSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/StreamingSockJsSession.java @@ -16,8 +16,11 @@ package org.springframework.web.socket.sockjs.transport.session; +import java.io.IOException; import java.util.Map; +import org.springframework.http.server.ServerHttpRequest; +import org.springframework.http.server.ServerHttpResponse; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.sockjs.SockJsTransportFailureException; import org.springframework.web.socket.sockjs.frame.SockJsFrame; @@ -42,11 +45,26 @@ public class StreamingSockJsSession extends AbstractHttpSockJsSession { } + /** + * @deprecated as of 4.2 this method is no longer used for anything + */ @Override + @Deprecated protected boolean isStreaming() { return true; } + @Override + protected void handleRequestInternal(ServerHttpRequest request, ServerHttpResponse response, + boolean initialRequest) throws IOException { + + writePrelude(request, response); + if (initialRequest) { + writeFrame(SockJsFrame.openFrame()); + } + flushCache(); + } + @Override protected void flushCache() throws SockJsTransportFailureException { while (!getMessageCache().isEmpty()) { diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/HttpSockJsSessionTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/HttpSockJsSessionTests.java index be7f1664f0..4427db6637 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/HttpSockJsSessionTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/HttpSockJsSessionTests.java @@ -102,7 +102,7 @@ public class HttpSockJsSessionTests extends AbstractSockJsSessionTests