From 43d93712f1a827cf00a9b097dd546320ce841692 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 9 Dec 2014 09:47:36 -0500 Subject: [PATCH] Remove isStreaming flag from AbstractHttpSockJsSession This change removes the need for the isStreaming field from the base class AbstractHttpSockJsSession. This field was used to account for differences between polling vs streaming SockJS sessions without having to expose to sub-classes private fields that are otherwise protected from concurrent access by the base class. The change manages to delegate to sub-classes without providing direct access to protected fields. Issue: SPR-12427 --- .../simp/user/UserSessionRegistry.java | 2 + .../session/AbstractHttpSockJsSession.java | 60 +++++++------------ .../session/PollingSockJsSession.java | 24 ++++++++ .../session/StreamingSockJsSession.java | 18 ++++++ .../session/HttpSockJsSessionTests.java | 2 +- .../session/TestHttpSockJsSession.java | 4 +- 6 files changed, 71 insertions(+), 39 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserSessionRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserSessionRegistry.java index 4074d6b9f8..f23da3e530 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserSessionRegistry.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserSessionRegistry.java @@ -14,6 +14,8 @@ import java.util.Set; */ public interface UserSessionRegistry { + + /** * Return the active session id's for the given user. * @param user the user diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java index de1e148f28..3e1d939ed5 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java @@ -168,9 +168,9 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { } /** - * Whether this HTTP transport streams message frames vs closing the response - * after each frame written (long polling). + * @deprecated as of 4.2 this method is no longer used for anything */ + @Deprecated protected abstract boolean isStreaming(); @@ -205,15 +205,10 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { // Let "our" handler know before sending the open frame to the remote handler delegateConnectionEstablished(); - if (isStreaming()) { - writePrelude(request, response); - writeFrame(SockJsFrame.openFrame()); - flushCache(); - this.readyToSend = true; - } - else { - writeFrame(SockJsFrame.openFrame()); - } + handleRequestInternal(request, response, true); + + // Request might have been reset (e.g. polling sessions do after writing) + this.readyToSend = isActive(); } catch (Throwable ex) { tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR); @@ -222,9 +217,6 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { } } - protected void writePrelude(ServerHttpRequest request, ServerHttpResponse response) throws IOException { - } - /** * Handle all requests, except the first one, to receive messages on a SockJS * HTTP transport based session. @@ -251,20 +243,8 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { this.asyncRequestControl = request.getAsyncRequestControl(response); this.asyncRequestControl.start(-1); - if (isStreaming()) { - writePrelude(request, response); - flushCache(); - this.readyToSend = true; - } - else { - if (this.messageCache.isEmpty()) { - scheduleHeartbeat(); - this.readyToSend = true; - } - else { - flushCache(); - } - } + handleRequestInternal(request, response, false); + this.readyToSend = isActive(); } catch (Throwable ex) { tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR); @@ -273,6 +253,14 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { } } + /** + * Invoked when a SockJS transport request is received. + * @param request the current request + * @param response the current response + * @param initialRequest whether it is the first request for the session + */ + protected abstract void handleRequestInternal(ServerHttpRequest request, ServerHttpResponse response, + boolean initialRequest) throws IOException; @Override protected final void sendMessageInternal(String message) throws SockJsTransportFailureException { @@ -287,23 +275,26 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { } cancelHeartbeat(); flushCache(); - return; } else { if (logger.isTraceEnabled()) { logger.trace("Session is not active, not ready to flush."); } - return; } } } /** * Called when the connection is active and ready to write to the response. - * Subclasses should implement but never call this method directly. + * Subclasses should only call this method from a method where the + * "responseLock" is acquired. */ protected abstract void flushCache() throws SockJsTransportFailureException; + + protected void writePrelude(ServerHttpRequest request, ServerHttpResponse response) throws IOException { + } + @Override protected void disconnect(CloseStatus status) { resetRequest(); @@ -341,12 +332,7 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { logger.trace("Writing to HTTP response: " + formattedFrame); } this.response.getBody().write(formattedFrame.getBytes(SockJsFrame.CHARSET)); - if (isStreaming()) { - this.response.flush(); - } - else { - resetRequest(); - } + this.response.flush(); } } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/PollingSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/PollingSockJsSession.java index 48b9b7533b..0ece43cf68 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/PollingSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/PollingSockJsSession.java @@ -16,8 +16,11 @@ package org.springframework.web.socket.sockjs.transport.session; +import java.io.IOException; import java.util.Map; +import org.springframework.http.server.ServerHttpRequest; +import org.springframework.http.server.ServerHttpResponse; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.sockjs.SockJsTransportFailureException; import org.springframework.web.socket.sockjs.frame.SockJsFrame; @@ -40,11 +43,31 @@ public class PollingSockJsSession extends AbstractHttpSockJsSession { } + /** + * @deprecated as of 4.2 this method is no longer used for anything + */ @Override + @Deprecated protected boolean isStreaming() { return false; } + @Override + protected void handleRequestInternal(ServerHttpRequest request, ServerHttpResponse response, + boolean initialRequest) throws IOException { + + if (initialRequest) { + writeFrame(SockJsFrame.openFrame()); + resetRequest(); + } + else if (!getMessageCache().isEmpty()) { + flushCache(); + } + else { + scheduleHeartbeat(); + } + } + @Override protected void flushCache() throws SockJsTransportFailureException { String[] messages = new String[getMessageCache().size()]; @@ -54,6 +77,7 @@ public class PollingSockJsSession extends AbstractHttpSockJsSession { SockJsMessageCodec messageCodec = getSockJsServiceConfig().getMessageCodec(); SockJsFrame frame = SockJsFrame.messageFrame(messageCodec, messages); writeFrame(frame); + resetRequest(); } } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/StreamingSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/StreamingSockJsSession.java index 06caafe088..a63f1898b3 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/StreamingSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/StreamingSockJsSession.java @@ -16,8 +16,11 @@ package org.springframework.web.socket.sockjs.transport.session; +import java.io.IOException; import java.util.Map; +import org.springframework.http.server.ServerHttpRequest; +import org.springframework.http.server.ServerHttpResponse; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.sockjs.SockJsTransportFailureException; import org.springframework.web.socket.sockjs.frame.SockJsFrame; @@ -42,11 +45,26 @@ public class StreamingSockJsSession extends AbstractHttpSockJsSession { } + /** + * @deprecated as of 4.2 this method is no longer used for anything + */ @Override + @Deprecated protected boolean isStreaming() { return true; } + @Override + protected void handleRequestInternal(ServerHttpRequest request, ServerHttpResponse response, + boolean initialRequest) throws IOException { + + writePrelude(request, response); + if (initialRequest) { + writeFrame(SockJsFrame.openFrame()); + } + flushCache(); + } + @Override protected void flushCache() throws SockJsTransportFailureException { while (!getMessageCache().isEmpty()) { diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/HttpSockJsSessionTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/HttpSockJsSessionTests.java index be7f1664f0..4427db6637 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/HttpSockJsSessionTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/HttpSockJsSessionTests.java @@ -102,7 +102,7 @@ public class HttpSockJsSessionTests extends AbstractSockJsSessionTests