Polishing

This commit is contained in:
Juergen Hoeller 2017-01-23 21:28:40 +01:00
parent e94fa3f34d
commit f095aa20eb
3 changed files with 136 additions and 142 deletions

View File

@ -55,21 +55,21 @@ import org.springframework.util.concurrent.ListenableFutureTask;
* connection to the broker is opened and used exclusively for all messages from the
* client that originated the CONNECT message. Messages from the same client are
* identified through the session id message header. Reversely, when the STOMP broker
* sends messages back on the TCP connection, those messages are enriched with the session
* id of the client and sent back downstream through the {@link MessageChannel} provided
* to the constructor.
* sends messages back on the TCP connection, those messages are enriched with the
* session id of the client and sent back downstream through the {@link MessageChannel}
* provided to the constructor.
*
* <p>This class also automatically opens a default "system" TCP connection to the message
* broker that is used for sending messages that originate from the server application (as
* opposed to from a client). Such messages are not associated with any client and
* therefore do not have a session id header. The "system" connection is effectively
* shared and cannot be used to receive messages. Several properties are provided to
* configure the "system" connection including:
* <p>This class also automatically opens a default "system" TCP connection to the
* message broker that is used for sending messages that originate from the server
* application (as opposed to from a client). Such messages are not associated with
* any client and therefore do not have a session id header. The "system" connection
* is effectively shared and cannot be used to receive messages. Several properties
* are provided to configure the "system" connection including:
* <ul>
* <li>{@link #setSystemLogin(String)}</li>
* <li>{@link #setSystemPasscode(String)}</li>
* <li>{@link #setSystemHeartbeatSendInterval(long)}</li>
* <li>{@link #setSystemHeartbeatReceiveInterval(long)}</li>
* <li>{@link #setSystemLogin}</li>
* <li>{@link #setSystemPasscode}</li>
* <li>{@link #setSystemHeartbeatSendInterval}</li>
* <li>{@link #setSystemHeartbeatReceiveInterval}</li>
* </ul>
*
* @author Rossen Stoyanchev
@ -80,23 +80,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
public static final String SYSTEM_SESSION_ID = "_system_";
// STOMP recommends error of margin for receiving heartbeats
private static final long HEARTBEAT_MULTIPLIER = 3;
/**
* A heartbeat is setup once a CONNECTED frame is received which contains the heartbeat settings
* we need. If we don't receive CONNECTED within a minute, the connection is closed proactively.
*/
private static final int MAX_TIME_TO_CONNECTED_FRAME = 60 * 1000;
private static final byte[] EMPTY_PAYLOAD = new byte[0];
private static final ListenableFutureTask<Void> EMPTY_TASK = new ListenableFutureTask<>(new VoidCallable());
// STOMP recommends error of margin for receiving heartbeats
private static final long HEARTBEAT_MULTIPLIER = 3;
private static final Message<byte[]> HEARTBEAT_MESSAGE;
/**
* A heartbeat is setup once a CONNECTED frame is received which contains
* the heartbeat settings we need. If we don't receive CONNECTED within
* a minute, the connection is closed proactively.
*/
private static final int MAX_TIME_TO_CONNECTED_FRAME = 60 * 1000;
static {
EMPTY_TASK.run();
@ -121,19 +119,18 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private long systemHeartbeatReceiveInterval = 10000;
private String virtualHost;
private final Map<String, MessageHandler> systemSubscriptions = new HashMap<>(4);
private String virtualHost;
private TcpOperations<byte[]> tcpClient;
private MessageHeaderInitializer headerInitializer;
private final Map<String, StompConnectionHandler> connectionHandlers =
new ConcurrentHashMap<>();
private final Stats stats = new Stats();
private final Map<String, StompConnectionHandler> connectionHandlers = new ConcurrentHashMap<>();
/**
* Create a StompBrokerRelayMessageHandler instance with the given message channels
@ -179,46 +176,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
public int getRelayPort() {
return this.relayPort;
}
/**
* Set the interval, in milliseconds, at which the "system" connection will, in the
* absence of any other data being sent, send a heartbeat to the STOMP broker. A value
* of zero will prevent heartbeats from being sent to the broker.
* <p>The default value is 10000.
* <p>See class-level documentation for more information on the "system" connection.
*/
public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) {
this.systemHeartbeatSendInterval = systemHeartbeatSendInterval;
}
/**
* Return the interval, in milliseconds, at which the "system" connection will
* send heartbeats to the STOMP broker.
*/
public long getSystemHeartbeatSendInterval() {
return this.systemHeartbeatSendInterval;
}
/**
* Set the maximum interval, in milliseconds, at which the "system" connection
* expects, in the absence of any other data, to receive a heartbeat from the STOMP
* broker. A value of zero will configure the connection to expect not to receive
* heartbeats from the broker.
* <p>The default value is 10000.
* <p>See class-level documentation for more information on the "system" connection.
*/
public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) {
this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval;
}
/**
* Return the interval, in milliseconds, at which the "system" connection expects
* to receive heartbeats from the STOMP broker.
*/
public long getSystemHeartbeatReceiveInterval() {
return this.systemHeartbeatReceiveInterval;
}
/**
* Set the login to use when creating connections to the STOMP broker on
* behalf of connected clients.
@ -294,6 +251,46 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return this.systemPasscode;
}
/**
* Set the interval, in milliseconds, at which the "system" connection will, in the
* absence of any other data being sent, send a heartbeat to the STOMP broker. A value
* of zero will prevent heartbeats from being sent to the broker.
* <p>The default value is 10000.
* <p>See class-level documentation for more information on the "system" connection.
*/
public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) {
this.systemHeartbeatSendInterval = systemHeartbeatSendInterval;
}
/**
* Return the interval, in milliseconds, at which the "system" connection will
* send heartbeats to the STOMP broker.
*/
public long getSystemHeartbeatSendInterval() {
return this.systemHeartbeatSendInterval;
}
/**
* Set the maximum interval, in milliseconds, at which the "system" connection
* expects, in the absence of any other data, to receive a heartbeat from the STOMP
* broker. A value of zero will configure the connection to expect not to receive
* heartbeats from the broker.
* <p>The default value is 10000.
* <p>See class-level documentation for more information on the "system" connection.
*/
public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) {
this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval;
}
/**
* Return the interval, in milliseconds, at which the "system" connection expects
* to receive heartbeats from the STOMP broker.
*/
public long getSystemHeartbeatReceiveInterval() {
return this.systemHeartbeatReceiveInterval;
}
/**
* Configure one more destinations to subscribe to on the shared "system"
* connection along with MessageHandler's to handle received messages.
@ -336,28 +333,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
/**
* Configure a TCP client for managing TCP connections to the STOMP broker.
* By default {@link ReactorNettyTcpClient} is used.
* <p>By default {@link ReactorNettyTcpClient} is used.
*/
public void setTcpClient(TcpOperations<byte[]> tcpClient) {
this.tcpClient = tcpClient;
}
/**
* Get the configured TCP client. Never {@code null} unless not configured
* Get the configured TCP client (never {@code null} unless not configured
* invoked and this method is invoked before the handler is started and
* hence a default implementation initialized.
* hence a default implementation initialized).
*/
public TcpOperations<byte[]> getTcpClient() {
return this.tcpClient;
}
/**
* Return the current count of TCP connection to the broker.
*/
public int getConnectionCount() {
return this.connectionHandlers.size();
}
/**
* Configure a {@link MessageHeaderInitializer} to apply to the headers of all
* messages created through the {@code StompBrokerRelayMessageHandler} that
@ -382,6 +372,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return this.stats.toString();
}
/**
* Return the current count of TCP connection to the broker.
*/
public int getConnectionCount() {
return this.connectionHandlers.size();
}
@Override
protected void startInternal() {
@ -872,6 +869,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
private class SystemStompConnectionHandler extends StompConnectionHandler {
public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) {
@ -971,10 +969,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
private static class VoidCallable implements Callable<Void> {
@Override
public Void call() throws Exception {
public Void call() {
return null;
}
}
@ -1001,10 +1000,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
public String toString() {
return connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort +
return (connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort +
(isBrokerAvailable() ? " (available)" : " (not available)") +
", processed CONNECT(" + this.connect.get() + ")-CONNECTED(" +
this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")";
this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")");
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -91,7 +91,6 @@ public abstract class AbstractWebSocketSession<T> implements NativeWebSocketSess
@Override
public final void sendMessage(WebSocketMessage<?> message) throws IOException {
checkNativeSessionInitialized();
if (logger.isTraceEnabled()) {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -34,7 +34,6 @@ import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.sockjs.frame.SockJsFrame;
import org.springframework.web.socket.sockjs.frame.SockJsFrameType;
import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec;
/**
@ -44,20 +43,19 @@ import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec;
* Sub-classes implement actual send as well as disconnect logic.
*
* @author Rossen Stoyanchev
* @author Juergen Hoeller
* @since 4.1
*/
public abstract class AbstractClientSockJsSession implements WebSocketSession {
protected final Log logger = LogFactory.getLog(getClass());
private final TransportRequest request;
private final WebSocketHandler webSocketHandler;
private final SettableListenableFuture<WebSocketSession> connectFuture;
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
private volatile State state = State.NEW;
@ -127,25 +125,31 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
@Override
public boolean isOpen() {
return State.OPEN.equals(this.state);
return (this.state == State.OPEN);
}
public boolean isDisconnected() {
return (State.CLOSING.equals(this.state) || State.CLOSED.equals(this.state));
return (this.state == State.CLOSING || this.state == State.CLOSED);
}
@Override
public final void sendMessage(WebSocketMessage<?> message) throws IOException {
Assert.state(State.OPEN.equals(this.state), this + " is not open, current state=" + this.state);
Assert.isInstanceOf(TextMessage.class, message, this + " supports text messages only.");
String payload = ((TextMessage) message).getPayload();
payload = getMessageCodec().encode(new String[] { payload });
payload = payload.substring(1); // the client-side doesn't need message framing (letter "a")
message = new TextMessage(payload);
if (logger.isTraceEnabled()) {
logger.trace("Sending message " + message + " in " + this);
if (!(message instanceof TextMessage)) {
throw new IllegalArgumentException(this + " supports text messages only.");
}
sendInternal((TextMessage) message);
if (this.state != State.OPEN) {
throw new IllegalStateException(this + " is not open: current state " + this.state);
}
String payload = ((TextMessage) message).getPayload();
payload = getMessageCodec().encode(payload);
payload = payload.substring(1); // the client-side doesn't need message framing (letter "a")
TextMessage messageToSend = new TextMessage(payload);
if (logger.isTraceEnabled()) {
logger.trace("Sending message " + messageToSend + " in " + this);
}
sendInternal(messageToSend);
}
protected abstract void sendInternal(TextMessage textMessage) throws IOException;
@ -173,10 +177,13 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
logger.warn("Ignoring close since connect() was never invoked");
return;
}
if (State.CLOSING.equals(this.state) || State.CLOSED.equals(this.state)) {
logger.debug("Ignoring close (already closing or closed), current state=" + this.state);
if (isDisconnected()) {
if (logger.isDebugEnabled()) {
logger.debug("Ignoring close (already closing or closed): current state " + this.state);
}
return;
}
this.state = State.CLOSING;
this.closeStatus = status;
try {
@ -193,23 +200,20 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
public void handleFrame(String payload) {
SockJsFrame frame = new SockJsFrame(payload);
if (SockJsFrameType.OPEN.equals(frame.getType())) {
handleOpenFrame();
}
else if (SockJsFrameType.MESSAGE.equals(frame.getType())) {
handleMessageFrame(frame);
}
else if (SockJsFrameType.CLOSE.equals(frame.getType())) {
handleCloseFrame(frame);
}
else if (SockJsFrameType.HEARTBEAT.equals(frame.getType())) {
if (logger.isTraceEnabled()) {
logger.trace("Received heartbeat in " + this);
}
}
else {
// should never happen
throw new IllegalStateException("Unknown SockJS frame type " + frame + " in " + this);
switch (frame.getType()) {
case OPEN:
handleOpenFrame();
break;
case HEARTBEAT:
if (logger.isTraceEnabled()) {
logger.trace("Received heartbeat in " + this);
}
break;
case MESSAGE:
handleMessageFrame(frame);
break;
case CLOSE:
handleCloseFrame(frame);
}
}
@ -217,7 +221,7 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
if (logger.isDebugEnabled()) {
logger.debug("Processing SockJS open frame in " + this);
}
if (State.NEW.equals(state)) {
if (this.state == State.NEW) {
this.state = State.OPEN;
try {
this.webSocketHandler.afterConnectionEstablished(this);
@ -225,16 +229,14 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
}
catch (Throwable ex) {
if (logger.isErrorEnabled()) {
Class<?> type = this.webSocketHandler.getClass();
logger.error(type + ".afterConnectionEstablished threw exception in " + this, ex);
logger.error("WebSocketHandler.afterConnectionEstablished threw exception in " + this, ex);
}
}
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Open frame received in " + getId() + " but we're not" +
"connecting (current state=" + this.state + "). The server might " +
"have been restarted and lost track of the session.");
logger.debug("Open frame received in " + getId() + " but we're not connecting (current state " +
this.state + "). The server might have been restarted and lost track of the session.");
}
closeInternal(new CloseStatus(1006, "Server lost session"));
}
@ -243,10 +245,11 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
private void handleMessageFrame(SockJsFrame frame) {
if (!isOpen()) {
if (logger.isErrorEnabled()) {
logger.error("Ignoring received message due to state=" + this.state + " in " + this);
logger.error("Ignoring received message due to state " + this.state + " in " + this);
}
return;
}
String[] messages;
try {
messages = getMessageCodec().decode(frame.getFrameData());
@ -258,18 +261,18 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
closeInternal(CloseStatus.BAD_DATA);
return;
}
if (logger.isTraceEnabled()) {
logger.trace("Processing SockJS message frame " + frame.getContent() + " in " + this);
}
for (String message : messages) {
try {
if (isOpen()) {
if (isOpen()) {
try {
this.webSocketHandler.handleMessage(this, new TextMessage(message));
}
}
catch (Throwable ex) {
Class<?> type = this.webSocketHandler.getClass();
logger.error(type + ".handleMessage threw an exception on " + frame + " in " + this, ex);
catch (Throwable ex) {
logger.error("WebSocketHandler.handleMessage threw an exception on " + frame + " in " + this, ex);
}
}
}
}
@ -300,18 +303,14 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
}
this.webSocketHandler.handleTransportError(this, error);
}
catch (Exception ex) {
Class<?> type = this.webSocketHandler.getClass();
if (logger.isErrorEnabled()) {
logger.error(type + ".handleTransportError threw an exception", ex);
}
catch (Throwable ex) {
logger.error("WebSocketHandler.handleTransportError threw an exception", ex);
}
}
public void afterTransportClosed(CloseStatus closeStatus) {
this.closeStatus = (this.closeStatus != null ? this.closeStatus : closeStatus);
Assert.state(this.closeStatus != null, "CloseStatus not available");
if (logger.isDebugEnabled()) {
logger.debug("Transport closed with " + this.closeStatus + " in " + this);
}
@ -320,11 +319,8 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
try {
this.webSocketHandler.afterConnectionClosed(this, this.closeStatus);
}
catch (Exception ex) {
if (logger.isErrorEnabled()) {
Class<?> type = this.webSocketHandler.getClass();
logger.error(type + ".afterConnectionClosed threw an exception", ex);
}
catch (Throwable ex) {
logger.error("WebSocketHandler.afterConnectionClosed threw an exception", ex);
}
}