Improve shutdown responsiveness of SubProtocolWSH

Proactively notify all active WebSocket sessions when a shutdown is
progress. Sessions then can ignore further attempts to send messages
and also stop stop trying to flush messages right away.
This commit is contained in:
Rossen Stoyanchev 2014-03-27 11:47:48 -04:00
parent ea1e27efa2
commit ace6bd2418
2 changed files with 29 additions and 7 deletions

View File

@ -18,7 +18,6 @@ package org.springframework.web.socket.handler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.util.Assert;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
@ -62,6 +61,8 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat
private volatile boolean limitExceeded;
private volatile boolean shutDownInProgress;
private final Lock flushLock = new ReentrantLock();
@ -87,7 +88,7 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat
public void sendMessage(WebSocketMessage<?> message) throws IOException {
if (this.limitExceeded) {
if (isDisabled()) {
return;
}
@ -105,15 +106,19 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat
break;
}
}
while (!this.buffer.isEmpty());
while (!this.buffer.isEmpty() && !isDisabled());
}
private boolean isDisabled() {
return (this.limitExceeded || this.shutDownInProgress);
}
private boolean tryFlushMessageBuffer() throws IOException {
if (this.flushLock.tryLock() && !this.limitExceeded) {
if (this.flushLock.tryLock()) {
try {
while (true) {
WebSocketMessage<?> messageToSend = this.buffer.poll();
if (messageToSend == null) {
if (messageToSend == null || isDisabled()) {
break;
}
this.bufferSize.addAndGet(messageToSend.getPayloadLength() * -1);
@ -132,7 +137,7 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat
}
private void checkSessionLimits() throws IOException {
if (this.closeLock.tryLock() && !this.limitExceeded) {
if (!isDisabled() && this.closeLock.tryLock()) {
try {
if (getTimeSinceSendStarted() > this.sendTimeLimit) {
@ -161,4 +166,9 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat
throw new SessionLimitExceededException(reason, status);
}
@Override
public void close(CloseStatus status) throws IOException {
this.shutDownInProgress = true;
super.close(status);
}
}

View File

@ -16,6 +16,7 @@
package org.springframework.web.socket.messaging;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
@ -221,8 +222,19 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler,
@Override
public final void stop() {
synchronized (this.lifecycleMonitor) {
this.running = false;
this.clientOutboundChannel.unsubscribe(this);
// Notify sessions to stop flushing messages
for (WebSocketSession session : this.sessions.values()) {
try {
session.close(CloseStatus.GOING_AWAY);
}
catch (Throwable t) {
logger.error("Failed to close session id '" + session.getId() + "': " + t.getMessage());
}
}
}
}
@ -298,7 +310,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler,
WebSocketSession session = this.sessions.get(sessionId);
if (session == null) {
logger.error("Session not found for session with id " + sessionId);
logger.error("Session not found for session with id '" + sessionId + "', ignoring message " + message);
return;
}