Remove isStreaming flag from AbstractHttpSockJsSession

This change removes the need for the isStreaming field from the base
class AbstractHttpSockJsSession. This field was used to account for
differences between polling vs streaming SockJS sessions without having
to expose to sub-classes private fields that are otherwise protected
from concurrent access by the base class. The change manages to delegate
to sub-classes without providing direct access to protected fields.

Issue: SPR-12427
This commit is contained in:
Rossen Stoyanchev 2014-12-09 09:47:36 -05:00
parent 7a6dfe3765
commit 43d93712f1
6 changed files with 71 additions and 39 deletions

View File

@ -14,6 +14,8 @@ import java.util.Set;
*/ */
public interface UserSessionRegistry { public interface UserSessionRegistry {
/** /**
* Return the active session id's for the given user. * Return the active session id's for the given user.
* @param user the user * @param user the user

View File

@ -168,9 +168,9 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
} }
/** /**
* Whether this HTTP transport streams message frames vs closing the response * @deprecated as of 4.2 this method is no longer used for anything
* after each frame written (long polling).
*/ */
@Deprecated
protected abstract boolean isStreaming(); protected abstract boolean isStreaming();
@ -205,15 +205,10 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
// Let "our" handler know before sending the open frame to the remote handler // Let "our" handler know before sending the open frame to the remote handler
delegateConnectionEstablished(); delegateConnectionEstablished();
if (isStreaming()) { handleRequestInternal(request, response, true);
writePrelude(request, response);
writeFrame(SockJsFrame.openFrame()); // Request might have been reset (e.g. polling sessions do after writing)
flushCache(); this.readyToSend = isActive();
this.readyToSend = true;
}
else {
writeFrame(SockJsFrame.openFrame());
}
} }
catch (Throwable ex) { catch (Throwable ex) {
tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR); tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR);
@ -222,9 +217,6 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
} }
} }
protected void writePrelude(ServerHttpRequest request, ServerHttpResponse response) throws IOException {
}
/** /**
* Handle all requests, except the first one, to receive messages on a SockJS * Handle all requests, except the first one, to receive messages on a SockJS
* HTTP transport based session. * HTTP transport based session.
@ -251,20 +243,8 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
this.asyncRequestControl = request.getAsyncRequestControl(response); this.asyncRequestControl = request.getAsyncRequestControl(response);
this.asyncRequestControl.start(-1); this.asyncRequestControl.start(-1);
if (isStreaming()) { handleRequestInternal(request, response, false);
writePrelude(request, response); this.readyToSend = isActive();
flushCache();
this.readyToSend = true;
}
else {
if (this.messageCache.isEmpty()) {
scheduleHeartbeat();
this.readyToSend = true;
}
else {
flushCache();
}
}
} }
catch (Throwable ex) { catch (Throwable ex) {
tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR); tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR);
@ -273,6 +253,14 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
} }
} }
/**
* Invoked when a SockJS transport request is received.
* @param request the current request
* @param response the current response
* @param initialRequest whether it is the first request for the session
*/
protected abstract void handleRequestInternal(ServerHttpRequest request, ServerHttpResponse response,
boolean initialRequest) throws IOException;
@Override @Override
protected final void sendMessageInternal(String message) throws SockJsTransportFailureException { protected final void sendMessageInternal(String message) throws SockJsTransportFailureException {
@ -287,23 +275,26 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
} }
cancelHeartbeat(); cancelHeartbeat();
flushCache(); flushCache();
return;
} }
else { else {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Session is not active, not ready to flush."); logger.trace("Session is not active, not ready to flush.");
} }
return;
} }
} }
} }
/** /**
* Called when the connection is active and ready to write to the response. * Called when the connection is active and ready to write to the response.
* Subclasses should implement but never call this method directly. * Subclasses should only call this method from a method where the
* "responseLock" is acquired.
*/ */
protected abstract void flushCache() throws SockJsTransportFailureException; protected abstract void flushCache() throws SockJsTransportFailureException;
protected void writePrelude(ServerHttpRequest request, ServerHttpResponse response) throws IOException {
}
@Override @Override
protected void disconnect(CloseStatus status) { protected void disconnect(CloseStatus status) {
resetRequest(); resetRequest();
@ -341,13 +332,8 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
logger.trace("Writing to HTTP response: " + formattedFrame); logger.trace("Writing to HTTP response: " + formattedFrame);
} }
this.response.getBody().write(formattedFrame.getBytes(SockJsFrame.CHARSET)); this.response.getBody().write(formattedFrame.getBytes(SockJsFrame.CHARSET));
if (isStreaming()) {
this.response.flush(); this.response.flush();
} }
else {
resetRequest();
}
}
} }
} }

View File

@ -16,8 +16,11 @@
package org.springframework.web.socket.sockjs.transport.session; package org.springframework.web.socket.sockjs.transport.session;
import java.io.IOException;
import java.util.Map; import java.util.Map;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.sockjs.SockJsTransportFailureException; import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
import org.springframework.web.socket.sockjs.frame.SockJsFrame; import org.springframework.web.socket.sockjs.frame.SockJsFrame;
@ -40,11 +43,31 @@ public class PollingSockJsSession extends AbstractHttpSockJsSession {
} }
/**
* @deprecated as of 4.2 this method is no longer used for anything
*/
@Override @Override
@Deprecated
protected boolean isStreaming() { protected boolean isStreaming() {
return false; return false;
} }
@Override
protected void handleRequestInternal(ServerHttpRequest request, ServerHttpResponse response,
boolean initialRequest) throws IOException {
if (initialRequest) {
writeFrame(SockJsFrame.openFrame());
resetRequest();
}
else if (!getMessageCache().isEmpty()) {
flushCache();
}
else {
scheduleHeartbeat();
}
}
@Override @Override
protected void flushCache() throws SockJsTransportFailureException { protected void flushCache() throws SockJsTransportFailureException {
String[] messages = new String[getMessageCache().size()]; String[] messages = new String[getMessageCache().size()];
@ -54,6 +77,7 @@ public class PollingSockJsSession extends AbstractHttpSockJsSession {
SockJsMessageCodec messageCodec = getSockJsServiceConfig().getMessageCodec(); SockJsMessageCodec messageCodec = getSockJsServiceConfig().getMessageCodec();
SockJsFrame frame = SockJsFrame.messageFrame(messageCodec, messages); SockJsFrame frame = SockJsFrame.messageFrame(messageCodec, messages);
writeFrame(frame); writeFrame(frame);
resetRequest();
} }
} }

View File

@ -16,8 +16,11 @@
package org.springframework.web.socket.sockjs.transport.session; package org.springframework.web.socket.sockjs.transport.session;
import java.io.IOException;
import java.util.Map; import java.util.Map;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.sockjs.SockJsTransportFailureException; import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
import org.springframework.web.socket.sockjs.frame.SockJsFrame; import org.springframework.web.socket.sockjs.frame.SockJsFrame;
@ -42,11 +45,26 @@ public class StreamingSockJsSession extends AbstractHttpSockJsSession {
} }
/**
* @deprecated as of 4.2 this method is no longer used for anything
*/
@Override @Override
@Deprecated
protected boolean isStreaming() { protected boolean isStreaming() {
return true; return true;
} }
@Override
protected void handleRequestInternal(ServerHttpRequest request, ServerHttpResponse response,
boolean initialRequest) throws IOException {
writePrelude(request, response);
if (initialRequest) {
writeFrame(SockJsFrame.openFrame());
}
flushCache();
}
@Override @Override
protected void flushCache() throws SockJsTransportFailureException { protected void flushCache() throws SockJsTransportFailureException {
while (!getMessageCache().isEmpty()) { while (!getMessageCache().isEmpty()) {

View File

@ -102,7 +102,7 @@ public class HttpSockJsSessionTests extends AbstractSockJsSessionTests<TestAbstr
} }
static class TestAbstractHttpSockJsSession extends AbstractHttpSockJsSession { static class TestAbstractHttpSockJsSession extends StreamingSockJsSession {
private IOException exceptionOnWriteFrame; private IOException exceptionOnWriteFrame;

View File

@ -21,6 +21,8 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.sockjs.SockJsTransportFailureException; import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
@ -30,7 +32,7 @@ import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig;
/** /**
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
*/ */
public class TestHttpSockJsSession extends AbstractHttpSockJsSession { public class TestHttpSockJsSession extends StreamingSockJsSession {
private boolean active; private boolean active;