diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java index 3cd34f9e969..8b99dae89b0 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java @@ -128,6 +128,8 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc /** * Whether the underlying WebSocket API has flow control and can suspend and * resume the receiving of messages. + *

Note: Sub-classes are encouraged to start out in + * suspended mode, if possible, and wait until demand is received. */ protected abstract boolean canSuspendReceiving(); @@ -238,7 +240,10 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc } void handleMessage(WebSocketMessage webSocketMessage) { - this.pendingMessages.offer(webSocketMessage); + if (!this.pendingMessages.offer(webSocketMessage)) { + throw new IllegalStateException("Too many messages received. " + + "Please ensure WebSocketSession.receive() is subscribed to."); + } onDataAvailable(); } } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java index 449394b781b..0e16d70e6ac 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java @@ -57,6 +57,8 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession completionMono) { super(session, ObjectUtils.getIdentityHexString(session), info, factory, completionMono); + // TODO: suspend causes failures if invoked at this stage + // suspendReceiving(); } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java index 926977243f8..c966300d8d1 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java @@ -49,6 +49,7 @@ public class TomcatWebSocketSession extends StandardWebSocketSession { MonoProcessor completionMono) { super(session, info, factory, completionMono); + suspendReceiving(); } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java index a78555c7e9e..122c7c50a6f 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java @@ -53,6 +53,7 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession completionMono) { super(channel, ObjectUtils.getIdentityHexString(channel), info, factory, completionMono); + suspendReceiving(); }