diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/ChannelSendOperator.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/ChannelSendOperator.java index 0fa659141b..f0b3ea2961 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/ChannelSendOperator.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/ChannelSendOperator.java @@ -281,6 +281,10 @@ class ChannelSendOperator extends Mono implements Scannable { return; } synchronized (this) { + if (this.state == State.READY_TO_WRITE) { + s.request(n); + return; + } if (this.writeSubscriber != null) { if (this.state == State.EMITTING_CACHED_SIGNALS) { this.demandBeforeReadyToWrite = n; diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java index 4cba68c3bf..63c4033a78 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java @@ -273,6 +273,10 @@ public class ChannelSendOperator extends Mono implements Scannable { return; } synchronized (this) { + if (this.state == State.READY_TO_WRITE) { + s.request(n); + return; + } if (this.writeSubscriber != null) { if (this.state == State.EMITTING_CACHED_SIGNALS) { this.demandBeforeReadyToWrite = n;