From ace6bd2418cba892f793e9e3666ac02a541074c7 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 27 Mar 2014 11:47:48 -0400 Subject: [PATCH] Improve shutdown responsiveness of SubProtocolWSH Proactively notify all active WebSocket sessions when a shutdown is progress. Sessions then can ignore further attempts to send messages and also stop stop trying to flush messages right away. --- .../ConcurrentWebSocketSessionDecorator.java | 22 ++++++++++++++----- .../SubProtocolWebSocketHandler.java | 14 +++++++++++- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecorator.java b/spring-websocket/src/main/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecorator.java index c00dd32a2a7..5a32c0d36e7 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecorator.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecorator.java @@ -18,7 +18,6 @@ package org.springframework.web.socket.handler; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.springframework.util.Assert; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketMessage; import org.springframework.web.socket.WebSocketSession; @@ -62,6 +61,8 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat private volatile boolean limitExceeded; + private volatile boolean shutDownInProgress; + private final Lock flushLock = new ReentrantLock(); @@ -87,7 +88,7 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat public void sendMessage(WebSocketMessage message) throws IOException { - if (this.limitExceeded) { + if (isDisabled()) { return; } @@ -105,15 +106,19 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat break; } } - while (!this.buffer.isEmpty()); + while (!this.buffer.isEmpty() && !isDisabled()); + } + + private boolean isDisabled() { + return (this.limitExceeded || this.shutDownInProgress); } private boolean tryFlushMessageBuffer() throws IOException { - if (this.flushLock.tryLock() && !this.limitExceeded) { + if (this.flushLock.tryLock()) { try { while (true) { WebSocketMessage messageToSend = this.buffer.poll(); - if (messageToSend == null) { + if (messageToSend == null || isDisabled()) { break; } this.bufferSize.addAndGet(messageToSend.getPayloadLength() * -1); @@ -132,7 +137,7 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat } private void checkSessionLimits() throws IOException { - if (this.closeLock.tryLock() && !this.limitExceeded) { + if (!isDisabled() && this.closeLock.tryLock()) { try { if (getTimeSinceSendStarted() > this.sendTimeLimit) { @@ -161,4 +166,9 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat throw new SessionLimitExceededException(reason, status); } + @Override + public void close(CloseStatus status) throws IOException { + this.shutDownInProgress = true; + super.close(status); + } } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java index a60d63299f2..e85f0eadfbc 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java @@ -16,6 +16,7 @@ package org.springframework.web.socket.messaging; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -221,8 +222,19 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, @Override public final void stop() { synchronized (this.lifecycleMonitor) { + this.running = false; this.clientOutboundChannel.unsubscribe(this); + + // Notify sessions to stop flushing messages + for (WebSocketSession session : this.sessions.values()) { + try { + session.close(CloseStatus.GOING_AWAY); + } + catch (Throwable t) { + logger.error("Failed to close session id '" + session.getId() + "': " + t.getMessage()); + } + } } } @@ -298,7 +310,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, WebSocketSession session = this.sessions.get(sessionId); if (session == null) { - logger.error("Session not found for session with id " + sessionId); + logger.error("Session not found for session with id '" + sessionId + "', ignoring message " + message); return; }