From d49a7a105d0b4a19abce778d128f35d3452e558d Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 8 Oct 2020 18:38:05 +0100 Subject: [PATCH] Fix failing tests from Reactor snapshot changes See gh-25884 --- .../result/method/SyncInvocableHandlerMethod.java | 2 +- .../web/reactive/socket/WebSocketIntegrationTests.java | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/SyncInvocableHandlerMethod.java b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/SyncInvocableHandlerMethod.java index b43bf7149a..ece6a29b3f 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/SyncInvocableHandlerMethod.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/SyncInvocableHandlerMethod.java @@ -102,7 +102,7 @@ public class SyncInvocableHandlerMethod extends HandlerMethod { public HandlerResult invokeForHandlerResult(ServerWebExchange exchange, BindingContext bindingContext, Object... providedArgs) { - MonoProcessor processor = MonoProcessor.fromSink(Sinks.one()); + MonoProcessor processor = MonoProcessor.fromSink(Sinks.unsafe().one()); this.delegate.invoke(exchange, bindingContext, providedArgs).subscribeWith(processor); if (processor.isTerminated()) { diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java index 656d9081f7..be5471c6a1 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java @@ -99,7 +99,7 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests { String protocol = "echo-v1"; AtomicReference infoRef = new AtomicReference<>(); - MonoProcessor output = MonoProcessor.fromSink(Sinks.one()); + MonoProcessor output = MonoProcessor.fromSink(Sinks.unsafe().one()); this.client.execute(getUrl("/sub-protocol"), new WebSocketHandler() { @@ -132,7 +132,7 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests { HttpHeaders headers = new HttpHeaders(); headers.add("my-header", "my-value"); - MonoProcessor output = MonoProcessor.fromSink(Sinks.one()); + MonoProcessor output = MonoProcessor.fromSink(Sinks.unsafe().one()); this.client.execute(getUrl("/custom-header"), headers, session -> session.receive() @@ -148,7 +148,7 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests { void sessionClosing(WebSocketClient client, HttpServer server, Class serverConfigClass) throws Exception { startServer(client, server, serverConfigClass); - MonoProcessor statusProcessor = MonoProcessor.fromSink(Sinks.one()); + MonoProcessor statusProcessor = MonoProcessor.fromSink(Sinks.unsafe().one()); this.client.execute(getUrl("/close"), session -> { logger.debug("Starting.."); @@ -169,7 +169,7 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests { void cookie(WebSocketClient client, HttpServer server, Class serverConfigClass) throws Exception { startServer(client, server, serverConfigClass); - MonoProcessor output = MonoProcessor.fromSink(Sinks.one()); + MonoProcessor output = MonoProcessor.fromSink(Sinks.unsafe().one()); AtomicReference cookie = new AtomicReference<>(); this.client.execute(getUrl("/cookie"), session -> {