From f44366877c571f2c7f270b4eef6c72dc8c050bfb Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 22 Nov 2017 17:37:11 -0500 Subject: [PATCH] Switch to suspended mode before demand After this commit, Tomcat and Undertow WebSocketSession imlpementations start out in suspended mode and wait for demand. The JettyWebSocketSession is capable of suspending but it doesn't seem to work if invoked before any messages are received. That may become an issue if there is a case where no demand appears long enough for more messages to accumulate than we can hold. UnderowServerHttpRequest would ideally also start in suspended mode but that also doesn't work. It is not an issue in this case since we can ignore the read notifications. Servlet API requires a proactive check before it calls you back so there is no need to suspend. Issue: SPR-16207 --- .../socket/adapter/AbstractListenerWebSocketSession.java | 7 ++++++- .../web/reactive/socket/adapter/JettyWebSocketSession.java | 2 ++ .../reactive/socket/adapter/TomcatWebSocketSession.java | 1 + .../reactive/socket/adapter/UndertowWebSocketSession.java | 1 + 4 files changed, 10 insertions(+), 1 deletion(-) diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java index 3cd34f9e969..8b99dae89b0 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java @@ -128,6 +128,8 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc /** * Whether the underlying WebSocket API has flow control and can suspend and * resume the receiving of messages. + *

Note: Sub-classes are encouraged to start out in + * suspended mode, if possible, and wait until demand is received. */ protected abstract boolean canSuspendReceiving(); @@ -238,7 +240,10 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc } void handleMessage(WebSocketMessage webSocketMessage) { - this.pendingMessages.offer(webSocketMessage); + if (!this.pendingMessages.offer(webSocketMessage)) { + throw new IllegalStateException("Too many messages received. " + + "Please ensure WebSocketSession.receive() is subscribed to."); + } onDataAvailable(); } } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java index 449394b781b..0e16d70e6ac 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java @@ -57,6 +57,8 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession completionMono) { super(session, ObjectUtils.getIdentityHexString(session), info, factory, completionMono); + // TODO: suspend causes failures if invoked at this stage + // suspendReceiving(); } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java index 926977243f8..c966300d8d1 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java @@ -49,6 +49,7 @@ public class TomcatWebSocketSession extends StandardWebSocketSession { MonoProcessor completionMono) { super(session, info, factory, completionMono); + suspendReceiving(); } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java index a78555c7e9e..122c7c50a6f 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java @@ -53,6 +53,7 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession completionMono) { super(channel, ObjectUtils.getIdentityHexString(channel), info, factory, completionMono); + suspendReceiving(); }