Add STOMP/WebSocket stats collection

This change adds collection of stats in key infrastructure components
of the WebSocket message broker config setup and exposes the gathered
information for logging and viewing (e.g. via JMX).

WebSocketMessageBrokerStats is a single class that assembles all
gathered information and by default logs it once every 15 minutes.
Application can also easily expose to JMX through an MBeanExporter.

A new section in the reference documentation provides a summary of
the available information.

Issue: SPR-11739
This commit is contained in:
Rossen Stoyanchev 2014-07-04 15:08:52 -04:00
parent b78b2e9a03
commit ab4864da2a
10 changed files with 523 additions and 16 deletions

View File

@ -21,6 +21,7 @@ import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
@ -118,6 +119,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private final Map<String, StompConnectionHandler> connectionHandlers =
new ConcurrentHashMap<String, StompConnectionHandler>();
private final Stats stats = new Stats();
/**
* Create a StompBrokerRelayMessageHandler instance with the given message channels
@ -352,6 +355,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return this.headerInitializer;
}
/**
* Return a String describing internal state and counters.
*/
public String getStatsInfo() {
return this.stats.toString();
}
@Override
protected void startInternal() {
@ -380,6 +390,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
SystemStompConnectionHandler handler = new SystemStompConnectionHandler(headers);
this.connectionHandlers.put(handler.getSessionId(), handler);
this.stats.incrementConnectCount();
this.tcpClient.connect(handler, new FixedIntervalReconnectStrategy(5000));
}
@ -469,6 +480,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
StompConnectionHandler handler = new StompConnectionHandler(sessionId, stompAccessor);
this.connectionHandlers.put(sessionId, handler);
this.stats.incrementConnectCount();
this.tcpClient.connect(handler);
}
else if (StompCommand.DISCONNECT.equals(command)) {
@ -479,6 +491,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
return;
}
stats.incrementDisconnectCount();
handler.forward(message, stompAccessor);
}
else {
@ -615,6 +628,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
*/
protected void afterStompConnected(StompHeaderAccessor connectedHeaders) {
this.isStompConnected = true;
stats.incrementConnectedCount();
initHeartbeats(connectedHeaders);
}
@ -877,4 +891,33 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
private class Stats {
private final AtomicInteger connect = new AtomicInteger();
private final AtomicInteger connected = new AtomicInteger();
private final AtomicInteger disconnect = new AtomicInteger();
public void incrementConnectCount() {
this.connect.incrementAndGet();
}
public void incrementConnectedCount() {
this.connected.incrementAndGet();
}
public void incrementDisconnectCount() {
this.disconnect.incrementAndGet();
}
public String toString() {
return connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort +
(isBrokerAvailable() ? " (available)" : " (not available)") +
", processed CONNECT(" + this.connect.get() + ")-CONNECTED(" +
this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")";
}
}
}

View File

@ -80,7 +80,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@Before
public void setUp() throws Exception {
logger.debug("Setting up '" + this.testName.getMethodName() + "'");
logger.debug("Setting up before '" + this.testName.getMethodName() + "'");
this.port = SocketUtils.findAvailableTcpPort(61613);
this.responseChannel = new ExecutorSubscribableChannel();
this.responseHandler = new TestMessageHandler();
@ -116,6 +116,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@After
public void tearDown() throws Exception {
try {
logger.debug("STOMP broker relay stats: " + this.relay.getStatsInfo());
this.relay.stop();
}
finally {

View File

@ -128,13 +128,13 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
beanName = registerBeanDef(beanDef, parserCxt, source);
RuntimeBeanReference userSessionRegistry = new RuntimeBeanReference(beanName);
RuntimeBeanReference subProtocolWsHandler = registerSubProtocolWebSocketHandler(
RuntimeBeanReference subProtocolHandlerDef = registerSubProtocolWebSocketHandler(
element, clientInChannel, clientOutChannel, userSessionRegistry, parserCxt, source);
for(Element stompEndpointElem : DomUtils.getChildElementsByTagName(element, "stomp-endpoint")) {
RuntimeBeanReference httpRequestHandler = registerHttpRequestHandler(
stompEndpointElem, subProtocolWsHandler, parserCxt, source);
stompEndpointElem, subProtocolHandlerDef, parserCxt, source);
String pathAttribute = stompEndpointElem.getAttribute("path");
Assert.state(StringUtils.hasText(pathAttribute), "Invalid <stomp-endpoint> (no path mapping)");
@ -155,7 +155,8 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
beanName = "brokerChannel";
channelElem = DomUtils.getChildElementByTagName(element, "broker-channel");
RuntimeBeanReference brokerChannel = getMessageChannel(beanName, channelElem, parserCxt, source);
registerMessageBroker(element, clientInChannel, clientOutChannel, brokerChannel, parserCxt, source);
RootBeanDefinition brokerDef = registerMessageBroker(element, clientInChannel,
clientOutChannel, brokerChannel, parserCxt, source);
RuntimeBeanReference messageConverter = registerBrokerMessageConverter(element, parserCxt, source);
@ -176,6 +177,9 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
scopeConfigurerDef.getPropertyValues().add("scopes", scopeMap);
registerBeanDefByName("webSocketScopeConfigurer", scopeConfigurerDef, parserCxt, source);
registerWebSocketMessageBrokerStats(subProtocolHandlerDef, brokerDef, clientInChannel,
clientOutChannel, parserCxt, source);
parserCxt.popAndRegisterContainingComponent();
return null;
@ -309,7 +313,7 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
return new RuntimeBeanReference(httpRequestHandlerBeanName);
}
private void registerMessageBroker(Element messageBrokerElement, RuntimeBeanReference clientInChannelDef,
private RootBeanDefinition registerMessageBroker(Element messageBrokerElement, RuntimeBeanReference clientInChannelDef,
RuntimeBeanReference clientOutChannelDef, RuntimeBeanReference brokerChannelDef,
ParserContext parserCxt, Object source) {
@ -321,15 +325,13 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
cavs.addIndexedArgumentValue(1, clientOutChannelDef);
cavs.addIndexedArgumentValue(2, brokerChannelDef);
RootBeanDefinition brokerDef;
if (simpleBrokerElem != null) {
String prefix = simpleBrokerElem.getAttribute("prefix");
cavs.addIndexedArgumentValue(3, Arrays.asList(StringUtils.tokenizeToStringArray(prefix, ",")));
RootBeanDefinition brokerDef = new RootBeanDefinition(SimpleBrokerMessageHandler.class, cavs, null);
registerBeanDef(brokerDef, parserCxt, source);
brokerDef = new RootBeanDefinition(SimpleBrokerMessageHandler.class, cavs, null);
}
else if (brokerRelayElem != null) {
String prefix = brokerRelayElem.getAttribute("prefix");
cavs.addIndexedArgumentValue(3, Arrays.asList(StringUtils.tokenizeToStringArray(prefix, ",")));
@ -370,12 +372,15 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
if(!attrValue.isEmpty()) {
mpvs.add("virtualHost", attrValue);
}
Class<?> handlerType = StompBrokerRelayMessageHandler.class;
RootBeanDefinition messageBrokerDef = new RootBeanDefinition(handlerType, cavs, mpvs);
registerBeanDef(messageBrokerDef, parserCxt, source);
brokerDef = new RootBeanDefinition(handlerType, cavs, mpvs);
}
else {
// Should not happen
throw new IllegalStateException("Neither <simple-broker> nor <stomp-broker-relay> elements found.");
}
registerBeanDef(brokerDef, parserCxt, source);
return brokerDef;
}
private RuntimeBeanReference registerBrokerMessageConverter(Element element,
@ -407,8 +412,8 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
ConstructorArgumentValues cavs = new ConstructorArgumentValues();
cavs.addIndexedArgumentValue(0, convertersDef);
RootBeanDefinition brokerMessage = new RootBeanDefinition(CompositeMessageConverter.class, cavs, null);
return new RuntimeBeanReference(registerBeanDef(brokerMessage, parserCxt, source));
RootBeanDefinition messageConverterDef = new RootBeanDefinition(CompositeMessageConverter.class, cavs, null);
return new RuntimeBeanReference(registerBeanDef(messageConverterDef, parserCxt, source));
}
private RuntimeBeanReference registerBrokerMessagingTemplate(
@ -481,6 +486,37 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
return new RuntimeBeanReference(userDestinationMessageHandleName);
}
private void registerWebSocketMessageBrokerStats(RuntimeBeanReference subProtocolHandlerDef,
RootBeanDefinition brokerDef, RuntimeBeanReference clientInChannel,
RuntimeBeanReference clientOutChannel, ParserContext parserCxt, Object source) {
RootBeanDefinition statsDef = new RootBeanDefinition(WebSocketMessageBrokerStats.class);
statsDef.getPropertyValues().add("subProtocolWebSocketHandler", subProtocolHandlerDef);
if (StompBrokerRelayMessageHandler.class.equals(brokerDef.getBeanClass())) {
statsDef.getPropertyValues().add("stompBrokerRelay", brokerDef);
}
String beanName = clientInChannel.getBeanName() + "Executor";
if (parserCxt.getRegistry().containsBeanDefinition(beanName)) {
BeanDefinition beanDef = parserCxt.getRegistry().getBeanDefinition(beanName);
statsDef.getPropertyValues().add("inboundChannelExecutor", beanDef);
}
beanName = clientOutChannel.getBeanName() + "Executor";
if (parserCxt.getRegistry().containsBeanDefinition(beanName)) {
BeanDefinition beanDef = parserCxt.getRegistry().getBeanDefinition(beanName);
statsDef.getPropertyValues().add("outboundChannelExecutor", beanDef);
}
beanName = SOCKJS_SCHEDULER_BEAN_NAME;
if (parserCxt.getRegistry().containsBeanDefinition(beanName)) {
BeanDefinition beanDef = parserCxt.getRegistry().getBeanDefinition(beanName);
statsDef.getPropertyValues().add("sockJsTaskScheduler", beanDef);
}
registerBeanDefByName("webSocketMessageBrokerStats", statsDef, parserCxt, source);
}
private static String registerBeanDef(RootBeanDefinition beanDef, ParserContext parserCxt, Object source) {
String beanName = parserCxt.getReaderContext().generateBeanName(beanDef);

View File

@ -0,0 +1,196 @@
/*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.socket.config;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.messaging.StompSubProtocolHandler;
import org.springframework.web.socket.messaging.SubProtocolHandler;
import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* A central class for aggregating information about internal state and counters
* from key infrastructure components of the setup that comes with
* {@code @EnableWebSocketMessageBroker} for Java config and
* {@code <websocket:message-broker>} for XML.
*
* <p>By default aggregated information is logged every 15 minutes at INFO level.
* The frequency of logging can be changed via {@link #setLoggingPeriod(long)}.
*
* <p>This class is declared as a Spring bean by the above configuration with the
* name "webSocketMessageBrokerStats" and can be easily exported to JMX, e.g. with
* the {@link org.springframework.jmx.export.MBeanExporter MBeanExporter}.
*
* @author Rossen Stoyanchev
* @since 4.1
*/
public class WebSocketMessageBrokerStats {
private static Log logger = LogFactory.getLog(WebSocketMessageBrokerStats.class);
private SubProtocolWebSocketHandler webSocketHandler;
private StompSubProtocolHandler stompSubProtocolHandler;
private StompBrokerRelayMessageHandler stompBrokerRelay;
private ThreadPoolExecutor inboundChannelExecutor;
private ThreadPoolExecutor outboundChannelExecutor;
private ScheduledThreadPoolExecutor sockJsTaskScheduler;
private ScheduledFuture<?> loggingTask;
private long loggingPeriod = 15 * 60 * 1000;
public void setSubProtocolWebSocketHandler(SubProtocolWebSocketHandler webSocketHandler) {
this.webSocketHandler = webSocketHandler;
this.stompSubProtocolHandler = initStompSubProtocolHandler();
}
private StompSubProtocolHandler initStompSubProtocolHandler() {
for (SubProtocolHandler handler : this.webSocketHandler.getProtocolHandlers()) {
if (handler instanceof StompSubProtocolHandler) {
return (StompSubProtocolHandler) handler;
}
}
SubProtocolHandler defaultHandler = this.webSocketHandler.getDefaultProtocolHandler();
if (defaultHandler != null && defaultHandler instanceof StompSubProtocolHandler) {
return (StompSubProtocolHandler) defaultHandler;
}
return null;
}
public void setStompBrokerRelay(StompBrokerRelayMessageHandler stompBrokerRelay) {
this.stompBrokerRelay = stompBrokerRelay;
}
public void setInboundChannelExecutor(ThreadPoolTaskExecutor inboundChannelExecutor) {
this.inboundChannelExecutor = inboundChannelExecutor.getThreadPoolExecutor();
}
public void setOutboundChannelExecutor(ThreadPoolTaskExecutor outboundChannelExecutor) {
this.outboundChannelExecutor = outboundChannelExecutor.getThreadPoolExecutor();
}
public void setSockJsTaskScheduler(ThreadPoolTaskScheduler sockJsTaskScheduler) {
this.sockJsTaskScheduler = sockJsTaskScheduler.getScheduledThreadPoolExecutor();
this.loggingTask = initLoggingTask(3 * 60 * 1000);
}
private ScheduledFuture<?> initLoggingTask(long initialDelay) {
if (logger.isInfoEnabled() && this.loggingPeriod > 0) {
return this.sockJsTaskScheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
logger.info(WebSocketMessageBrokerStats.this.toString());
}
}, initialDelay, this.loggingPeriod, TimeUnit.MILLISECONDS);
}
return null;
}
/**
* Set the frequency for logging information at INFO level in milliseconds.
* If set 0 or less than 0, the logging task is cancelled.
* <p>By default this property is set to 30 minutes (30 * 60 * 1000).
*/
public void setLoggingPeriod(long period) {
if (this.loggingTask != null) {
this.loggingTask.cancel(true);
}
this.loggingPeriod = period;
this.loggingTask = initLoggingTask(0);
}
/**
* Return the configured logging period frequency in milliseconds.
*/
public long getLoggingPeriod() {
return this.loggingPeriod;
}
/**
* Get stats about WebSocket sessions.
*/
public String getWebSocketSessionStatsInfo() {
return (this.webSocketHandler != null ? this.webSocketHandler.getStatsInfo() : "null");
}
/**
* Get stats about STOMP-related WebSocket message processing.
*/
public String getStompSubProtocolStatsInfo() {
return (this.stompSubProtocolHandler != null ? this.stompSubProtocolHandler.getStatsInfo() : "null");
}
/**
* Get stats about STOMP broker relay (when using a full-featured STOMP broker).
*/
public String getStompBrokerRelayStatsInfo() {
return (this.stompBrokerRelay != null ? this.stompBrokerRelay.getStatsInfo() : "null");
}
/**
* Get stats about the executor processing incoming messages from WebSocket clients.
*/
public String getClientInboundExecutorStatsInfo() {
return (this.inboundChannelExecutor != null ? getExecutorStatsInfo(this.inboundChannelExecutor) : "null");
}
/**
* Get stats about the executor processing outgoing messages to WebSocket clients.
*/
public String getClientOutboundExecutorStatsInfo() {
return (this.outboundChannelExecutor != null ? getExecutorStatsInfo(this.outboundChannelExecutor) : "null");
}
/**
* Get stats about the SockJS task scheduler.
*/
public String getSockJsTaskSchedulerStatsInfo() {
return (this.sockJsTaskScheduler != null ? getExecutorStatsInfo(this.sockJsTaskScheduler) : "null");
}
private String getExecutorStatsInfo(Executor executor) {
String s = executor.toString();
return s.substring(s.indexOf("pool"), s.length() - 1);
}
public String toString() {
return "WebSocketSession[" + getWebSocketSessionStatsInfo() + "]" +
", stompSubProtocol[" + getStompSubProtocolStatsInfo() + "]" +
", stompBrokerRelay[" + getStompBrokerRelayStatsInfo() + "]" +
", inboundChannel[" + getClientInboundExecutorStatsInfo() + "]" +
", outboundChannel" + getClientOutboundExecutorStatsInfo() + "]" +
", sockJsScheduler[" + getSockJsTaskSchedulerStatsInfo() + "]";
}
}

View File

@ -22,9 +22,11 @@ import org.springframework.beans.factory.config.CustomScopeConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.simp.SimpSessionScope;
import org.springframework.messaging.simp.config.AbstractMessageBrokerConfiguration;
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.servlet.HandlerMapping;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.WebSocketMessageBrokerStats;
import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler;
/**
@ -47,6 +49,7 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac
protected WebSocketMessageBrokerConfigurationSupport() {
}
@Bean
public HandlerMapping stompWebSocketHandlerMapping() {
@ -109,4 +112,22 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac
return configurer;
}
@Bean
public WebSocketMessageBrokerStats webSocketMessageBrokerStats() {
StompBrokerRelayMessageHandler brokerRelay =
stompBrokerRelayMessageHandler() instanceof StompBrokerRelayMessageHandler ?
(StompBrokerRelayMessageHandler) stompBrokerRelayMessageHandler() : null;
// Ensure STOMP endpoints are registered
stompWebSocketHandlerMapping();
WebSocketMessageBrokerStats stats = new WebSocketMessageBrokerStats();
stats.setSubProtocolWebSocketHandler((SubProtocolWebSocketHandler) subProtocolWebSocketHandler());
stats.setStompBrokerRelay(brokerRelay);
stats.setInboundChannelExecutor(clientInboundChannelExecutor());
stats.setOutboundChannelExecutor(clientOutboundChannelExecutor());
stats.setSockJsTaskScheduler(messageBrokerSockJsTaskScheduler());
return stats;
}
}

View File

@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -99,6 +100,8 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
private ApplicationEventPublisher eventPublisher;
private final Stats stats = new Stats();
/**
* Configure the maximum size allowed for an incoming STOMP message.
@ -167,6 +170,13 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
this.eventPublisher = applicationEventPublisher;
}
/**
* Return a String describing internal state and counters.
*/
public String getStatsInfo() {
return this.stats.toString();
}
/**
* Handle incoming WebSocket messages from clients.
@ -221,8 +231,12 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
if (this.eventPublisher != null) {
if (StompCommand.CONNECT.equals(headerAccessor.getCommand())) {
this.stats.incrementConnectCount();
publishEvent(new SessionConnectEvent(this, message));
}
else if (StompCommand.DISCONNECT.equals(headerAccessor.getCommand())) {
this.stats.incrementDisconnectCount();
}
else if (StompCommand.SUBSCRIBE.equals(headerAccessor.getCommand())) {
publishEvent(new SessionSubscribeEvent(this, message));
}
@ -298,6 +312,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
}
}
else if (StompCommand.CONNECTED.equals(command)) {
this.stats.incrementConnectedCount();
stompAccessor = afterStompSessionConnected(message, stompAccessor, session);
if (this.eventPublisher != null && StompCommand.CONNECTED.equals(command)) {
publishEvent(new SessionConnectedEvent(this, (Message<byte[]>) message));
@ -467,4 +482,33 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
return MessageBuilder.createMessage(EMPTY_PAYLOAD, headerAccessor.getMessageHeaders());
}
private class Stats {
private final AtomicInteger connect = new AtomicInteger();
private final AtomicInteger connected = new AtomicInteger();
private final AtomicInteger disconnect = new AtomicInteger();
public void incrementConnectCount() {
this.connect.incrementAndGet();
}
public void incrementConnectedCount() {
this.connected.incrementAndGet();
}
public void incrementDisconnectCount() {
this.disconnect.incrementAndGet();
}
public String toString() {
return "processed CONNECT(" + this.connect.get() + ")-CONNECTED(" +
this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")";
}
}
}

View File

@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
@ -44,6 +45,8 @@ import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
import org.springframework.web.socket.handler.SessionLimitExceededException;
import org.springframework.web.socket.sockjs.transport.session.PollingSockJsSession;
import org.springframework.web.socket.sockjs.transport.session.StreamingSockJsSession;
/**
* An implementation of {@link WebSocketHandler} that delegates incoming WebSocket
@ -97,6 +100,8 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler,
private final ReentrantLock sessionCheckLock = new ReentrantLock();
private final Stats stats = new Stats();
private final Object lifecycleMonitor = new Object();
private volatile boolean running = false;
@ -214,6 +219,14 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler,
}
}
/**
* Return a String describing internal state and counters.
*/
public String getStatsInfo() {
return this.stats.toString();
}
@Override
public final void start() {
Assert.isTrue(this.defaultProtocolHandler != null || !this.protocolHandlers.isEmpty(), "No handlers");
@ -249,6 +262,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler,
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
this.stats.incrementSessionCount(session);
session = new ConcurrentWebSocketSessionDecorator(session, getSendTimeLimit(), getSendBufferSizeLimit());
this.sessions.put(session.getId(), new WebSocketSessionHolder(session));
if (logger.isDebugEnabled()) {
@ -323,6 +337,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler,
catch (SessionLimitExceededException ex) {
try {
logger.error("Terminating '" + session + "'", ex);
this.stats.incrementLimitExceededCount();
clearSession(session, ex.getStatus()); // clear first, session may be unresponsive
session.close(ex.getStatus());
}
@ -381,6 +396,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler,
"Closing " + holder.getSession() + ".");
}
try {
this.stats.incrementNoMessagesReceivedCount();
session.close(CloseStatus.SESSION_NOT_RELIABLE);
}
catch (Throwable t) {
@ -396,6 +412,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler,
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
this.stats.incrementTransportError();
}
@Override
@ -407,7 +424,9 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler,
if (logger.isDebugEnabled()) {
logger.debug("Clearing session " + session.getId() + " (" + this.sessions.size() + " remain)");
}
this.sessions.remove(session.getId());
if (this.sessions.remove(session.getId()) != null) {
this.stats.decrementSessionCount(session);
}
findProtocolHandler(session).afterSessionEnded(session, closeStatus, this.clientInboundChannel);
}
@ -453,4 +472,67 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler,
}
}
private class Stats {
private final AtomicInteger total = new AtomicInteger();
private final AtomicInteger webSocket = new AtomicInteger();
private final AtomicInteger httpStreaming = new AtomicInteger();
private final AtomicInteger httpPolling = new AtomicInteger();
private final AtomicInteger limitExceeded = new AtomicInteger();
private final AtomicInteger noMessagesReceived = new AtomicInteger();
private final AtomicInteger transportError = new AtomicInteger();
public void incrementSessionCount(WebSocketSession session) {
getCountFor(session).incrementAndGet();
this.total.incrementAndGet();
}
public void decrementSessionCount(WebSocketSession session) {
getCountFor(session).decrementAndGet();
}
public void incrementLimitExceededCount() {
this.limitExceeded.incrementAndGet();
}
public void incrementNoMessagesReceivedCount() {
this.noMessagesReceived.incrementAndGet();
}
public void incrementTransportError() {
this.transportError.incrementAndGet();
}
private AtomicInteger getCountFor(WebSocketSession session) {
if (session instanceof PollingSockJsSession) {
return this.httpPolling;
}
else if (session instanceof StreamingSockJsSession) {
return this.httpStreaming;
}
else {
return this.webSocket;
}
}
public String toString() {
return SubProtocolWebSocketHandler.this.sessions.size() +
" current WS(" + this.webSocket.get() +
")-HttpStream(" + this.httpStreaming.get() +
")-HttpPoll(" + this.httpPolling.get() + "), " +
this.total.get() + " total, " +
(this.limitExceeded.get() + this.noMessagesReceived.get()) + " closed abnormally (" +
this.noMessagesReceived.get() + " connect failure, " +
this.limitExceeded.get() + " send limit, " +
this.transportError.get() + " transport error)";
}
}
}

View File

@ -245,6 +245,17 @@ public class MessageBrokerBeanDefinitionParserTests {
catch (NoSuchBeanDefinitionException ex) {
// expected
}
String name = "webSocketMessageBrokerStats";
WebSocketMessageBrokerStats stats = this.appContext.getBean(name, WebSocketMessageBrokerStats.class);
assertEquals("WebSocketSession[0 current WS(0)-HttpStream(0)-HttpPoll(0), " +
"0 total, 0 closed abnormally (0 connect failure, 0 send limit, 0 transport error)], " +
"stompSubProtocol[processed CONNECT(0)-CONNECTED(0)-DISCONNECT(0)], " +
"stompBrokerRelay[0 sessions, relayhost:1234 (not available), processed CONNECT(0)-CONNECTED(0)-DISCONNECT(0)], " +
"inboundChannel[pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0], " +
"outboundChannelpool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0], " +
"sockJsScheduler[pool size = 1, active threads = 0, queued tasks = 1, completed tasks = 0]",
stats.toString());
}
@Test

View File

@ -44,6 +44,7 @@ import org.springframework.web.servlet.HandlerMapping;
import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.config.WebSocketMessageBrokerStats;
import org.springframework.web.socket.handler.TestWebSocketSession;
import org.springframework.web.socket.messaging.StompSubProtocolHandler;
import org.springframework.web.socket.messaging.StompTextMessageBuilder;
@ -51,6 +52,7 @@ import org.springframework.web.socket.messaging.SubProtocolHandler;
import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
/**
* Test fixture for
@ -134,6 +136,20 @@ public class WebSocketMessageBrokerConfigurationSupportTests {
assertTrue(executor.getRemoveOnCancelPolicy());
}
@Test
public void webSocketMessageBrokerStats() {
String name = "webSocketMessageBrokerStats";
WebSocketMessageBrokerStats stats = this.config.getBean(name, WebSocketMessageBrokerStats.class);
assertEquals("WebSocketSession[0 current WS(0)-HttpStream(0)-HttpPoll(0), " +
"0 total, 0 closed abnormally (0 connect failure, 0 send limit, 0 transport error)], " +
"stompSubProtocol[processed CONNECT(0)-CONNECTED(0)-DISCONNECT(0)], " +
"stompBrokerRelay[null], " +
"inboundChannel[pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0], " +
"outboundChannelpool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0], " +
"sockJsScheduler[pool size = 1, active threads = 0, queued tasks = 1, completed tasks = 0]",
stats.toString());
}
@Controller
static class TestController {

View File

@ -38692,6 +38692,63 @@ through any other application instances.
[[websocket-stomp-stats]]
==== Runtime Monitoring
When using `@EnableWebSocketMessageBroker` or `<websocket:message-broker>` key
infrastructure components automatically gather stats and counters that provide
important insight into the internal state of the application. The configuration
also declares a bean of type `WebSocketMessageBrokerStats` that gathers all
available information in one place and by default logs it at INFO once
every 15 minutes. This bean can be exported to JMX through Spring's
`MBeanExporter` for viewing at runtime for example through JDK's jconsole.
Below is a summary of the available information.
Client WebSocket Sessions::
Current::: indicates how many client sessions there are
currently with the count further broken down by WebSocket vs HTTP
streaming and polling SockJS sessions.
Total::: indicates how many total sessions have been established.
Abnormally Closed:::
Connect Failures:::: these are sessions that got established but were
closed after not having received any messages within 60 seconds. This is
usually an indication of proxy or network issues.
Send Limit Exceeded:::: sessions closed after exceeding the configured send
timeout or the send buffer limits which can occur with slow clients
(see previous section).
Transport Errors:::: sessions closed after a transport error such as
failure to read or write to a WebSocket connection or
HTTP request/response.
STOMP Frames::: the total number of CONNECT, CONNECTED, and DISCONNECT frames
processed indicating how many clients connected on the STOMP level. Note that
the DISCONNECT count may be lower when sessions get closed abnormally or when
clients close without sending a DISCONNECT frame.
STOMP Broker Relay::
TCP Connections::: indicates how many TCP connections on behalf of client
WebSocket sessions are established to the broker. This should be equal to the
number of client WebSocket sessions + 1 additional shared "system" connection
for sending messages from within the application.
STOMP Frames::: the total number of CONNECT, CONNECTED, and DISCONNECT frames
forwarded to or received from the broker on behalf of clients. Note that a
DISCONNECT frame is sent to the broker regardless of how the client WebSocket
session was closed. Therefore a lower DISCONNECT frame count is an indication
that the broker is pro-actively closing connections, may be because of a
heartbeat that didn't arrive in time, an invalid input frame, or other.
Client Inbound Channel:: stats from thread pool backing the "clientInboundChannel"
providing insight into the health of incoming message processing. Tasks queueing
up here is an indication the application may be too slow to handle messages.
If there I/O bound tasks (e.g. slow database query, HTTP request to 3rd party
REST API, etc) consider increasing the thread pool size.
Client Outbound Channel:: stats from the thread pool backing the "clientOutboundChannel"
providing insight into the health of broadcasting messages to clients. Tasks
queueing up here is an indication clients are too slow to consume messages.
One way to address this is to increase the thread pool size to accommodate the
number of concurrent slow clients expected. Another option is to reduce the
send timeout and send buffer size limits (see the previous section).
SockJS Task Scheduler:: stats from thread pool of the SockJS task scheduler which
is used to send heartbeats. Note that when heartbeats are negotiated on the
STOMP level the SockJS heartbeats are disabled.
[[websocket-stomp-testing]]
==== Testing Annotated Controller Methods