SubProtocolWebSocketHandler provides protected decorateSession method

Issue: SPR-16089
This commit is contained in:
Juergen Hoeller 2017-10-21 23:02:54 +02:00
parent d418ba1b5d
commit 5809f5b8eb
3 changed files with 48 additions and 34 deletions

View File

@ -126,7 +126,7 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
String text = String.format("Another send already in progress: " + String text = String.format("Another send already in progress: " +
"session id '%s':, \"in-progress\" send time %d (ms), buffer size %d bytes", "session id '%s':, \"in-progress\" send time %d (ms), buffer size %d bytes",
getId(), getTimeSinceSendStarted(), this.bufferSize.get()); getId(), getTimeSinceSendStarted(), getBufferSize());
logger.trace(text); logger.trace(text);
} }
checkSessionLimits(); checkSessionLimits();
@ -166,14 +166,14 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat
private void checkSessionLimits() { private void checkSessionLimits() {
if (!shouldNotSend() && this.closeLock.tryLock()) { if (!shouldNotSend() && this.closeLock.tryLock()) {
try { try {
if (getTimeSinceSendStarted() > this.sendTimeLimit) { if (getTimeSinceSendStarted() > getSendTimeLimit()) {
String format = "Message send time %d (ms) for session '%s' exceeded the allowed limit %d"; String format = "Message send time %d (ms) for session '%s' exceeded the allowed limit %d";
String reason = String.format(format, getTimeSinceSendStarted(), getId(), this.sendTimeLimit); String reason = String.format(format, getTimeSinceSendStarted(), getId(), getSendTimeLimit());
limitExceeded(reason); limitExceeded(reason);
} }
else if (this.bufferSize.get() > this.bufferSizeLimit) { else if (getBufferSize() > getBufferSizeLimit()) {
String format = "The send buffer size %d bytes for session '%s' exceeded the allowed limit %d"; String format = "The send buffer size %d bytes for session '%s' exceeded the allowed limit %d";
String reason = String.format(format, this.bufferSize.get(), getId(), this.bufferSizeLimit); String reason = String.format(format, getBufferSize(), getId(), getBufferSizeLimit());
limitExceeded(reason); limitExceeded(reason);
} }
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2013 the original author or authors. * Copyright 2002-2017 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -26,24 +26,23 @@ import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.WebSocketSession;
/** /**
* A contract for handling WebSocket messages as part of a higher level protocol, referred * A contract for handling WebSocket messages as part of a higher level protocol,
* to as "sub-protocol" in the WebSocket RFC specification. Handles both * referred to as "sub-protocol" in the WebSocket RFC specification. Handles both
* {@link WebSocketMessage}s from a client as well as {@link Message}s to a client. * {@link WebSocketMessage}s from a client as well as {@link Message}s to a client.
* <p> *
* Implementations of this interface can be configured on a * <p>Implementations of this interface can be configured on a
* {@link SubProtocolWebSocketHandler} which selects a sub-protocol handler to delegate * {@link SubProtocolWebSocketHandler} which selects a sub-protocol handler to
* messages to based on the sub-protocol requested by the client through the * delegate messages to based on the sub-protocol requested by the client through
* {@code Sec-WebSocket-Protocol} request header. * the {@code Sec-WebSocket-Protocol} request header.
* *
* @author Andy Wilkinson * @author Andy Wilkinson
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
*
* @since 4.0 * @since 4.0
*/ */
public interface SubProtocolHandler { public interface SubProtocolHandler {
/** /**
* Return the list of sub-protocols supported by this handler, never {@code null}. * Return the list of sub-protocols supported by this handler (never {@code null}).
*/ */
List<String> getSupportedProtocols(); List<String> getSupportedProtocols();
@ -53,12 +52,11 @@ public interface SubProtocolHandler {
* @param message the client message * @param message the client message
* @param outputChannel an output channel to send messages to * @param outputChannel an output channel to send messages to
*/ */
void handleMessageFromClient(WebSocketSession session, WebSocketMessage<?> message, void handleMessageFromClient(WebSocketSession session, WebSocketMessage<?> message, MessageChannel outputChannel)
MessageChannel outputChannel) throws Exception; throws Exception;
/** /**
* Handle the given {@link Message} to the client associated with the given WebSocket * Handle the given {@link Message} to the client associated with the given WebSocket session.
* session.
* @param session the client session * @param session the client session
* @param message the client message * @param message the client message
*/ */
@ -84,7 +82,7 @@ public interface SubProtocolHandler {
* @param closeStatus the reason why the session was closed * @param closeStatus the reason why the session was closed
* @param outputChannel a channel * @param outputChannel a channel
*/ */
void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus, void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus, MessageChannel outputChannel)
MessageChannel outputChannel) throws Exception; throws Exception;
} }

View File

@ -16,7 +16,6 @@
package org.springframework.web.socket.messaging; package org.springframework.web.socket.messaging;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
@ -292,7 +291,7 @@ public class SubProtocolWebSocketHandler
return; return;
} }
this.stats.incrementSessionCount(session); this.stats.incrementSessionCount(session);
session = new ConcurrentWebSocketSessionDecorator(session, getSendTimeLimit(), getSendBufferSizeLimit()); session = decorateSession(session);
this.sessions.put(session.getId(), new WebSocketSessionHolder(session)); this.sessions.put(session.getId(), new WebSocketSessionHolder(session));
findProtocolHandler(session).afterSessionStarted(session, this.clientInboundChannel); findProtocolHandler(session).afterSessionStarted(session, this.clientInboundChannel);
} }
@ -377,6 +376,23 @@ public class SubProtocolWebSocketHandler
} }
/**
* Decorate the given {@link WebSocketSession}, if desired.
* <p>The default implementation builds a {@link ConcurrentWebSocketSessionDecorator}
* with the configured {@link #getSendTimeLimit() send-time limit} and
* {@link #getSendBufferSizeLimit() buffer-size limit}.
* @param session the original {@code WebSocketSession}
* @return the decorated {@code WebSocketSession}, or potentially the given session as-is
* @since 4.3.13
*/
protected WebSocketSession decorateSession(WebSocketSession session) {
return new ConcurrentWebSocketSessionDecorator(session, getSendTimeLimit(), getSendBufferSizeLimit());
}
/**
* Find a {@link SubProtocolHandler} for the given session.
* @param session the {@code WebSocketSession} to find a handler for
*/
protected final SubProtocolHandler findProtocolHandler(WebSocketSession session) { protected final SubProtocolHandler findProtocolHandler(WebSocketSession session) {
String protocol = null; String protocol = null;
try { try {
@ -432,12 +448,11 @@ public class SubProtocolWebSocketHandler
* When a session is connected through a higher-level protocol it has a chance * When a session is connected through a higher-level protocol it has a chance
* to use heartbeat management to shut down sessions that are too slow to send * to use heartbeat management to shut down sessions that are too slow to send
* or receive messages. However, after a WebSocketSession is established and * or receive messages. However, after a WebSocketSession is established and
* before the higher level protocol is fully connected there is a possibility * before the higher level protocol is fully connected there is a possibility for
* for sessions to hang. This method checks and closes any sessions that have * sessions to hang. This method checks and closes any sessions that have been
* been connected for more than 60 seconds without having received a single * connected for more than 60 seconds without having received a single message.
* message.
*/ */
private void checkSessions() throws IOException { private void checkSessions() {
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
if (!isRunning() || (currentTime - this.lastSessionCheckTime < TIME_TO_FIRST_MESSAGE)) { if (!isRunning() || (currentTime - this.lastSessionCheckTime < TIME_TO_FIRST_MESSAGE)) {
return; return;
@ -497,12 +512,13 @@ public class SubProtocolWebSocketHandler
private final WebSocketSession session; private final WebSocketSession session;
private final long createTime = System.currentTimeMillis(); private final long createTime;
private volatile boolean handledMessages; private volatile boolean hasHandledMessages;
private WebSocketSessionHolder(WebSocketSession session) { public WebSocketSessionHolder(WebSocketSession session) {
this.session = session; this.session = session;
this.createTime = System.currentTimeMillis();
} }
public WebSocketSession getSession() { public WebSocketSession getSession() {
@ -514,17 +530,17 @@ public class SubProtocolWebSocketHandler
} }
public void setHasHandledMessages() { public void setHasHandledMessages() {
this.handledMessages = true; this.hasHandledMessages = true;
} }
public boolean hasHandledMessages() { public boolean hasHandledMessages() {
return this.handledMessages; return this.hasHandledMessages;
} }
@Override @Override
public String toString() { public String toString() {
return "WebSocketSessionHolder[session=" + this.session + ", createTime=" + return "WebSocketSessionHolder[session=" + this.session + ", createTime=" +
this.createTime + ", hasHandledMessages=" + this.handledMessages + "]"; this.createTime + ", hasHandledMessages=" + this.hasHandledMessages + "]";
} }
} }