Fix failing tests from Reactor snapshot changes

See gh-25884
This commit is contained in:
Rossen Stoyanchev 2020-10-08 18:38:05 +01:00
parent 24bd0148d5
commit d49a7a105d
2 changed files with 5 additions and 5 deletions

View File

@ -102,7 +102,7 @@ public class SyncInvocableHandlerMethod extends HandlerMethod {
public HandlerResult invokeForHandlerResult(ServerWebExchange exchange,
BindingContext bindingContext, Object... providedArgs) {
MonoProcessor<HandlerResult> processor = MonoProcessor.fromSink(Sinks.one());
MonoProcessor<HandlerResult> processor = MonoProcessor.fromSink(Sinks.unsafe().one());
this.delegate.invoke(exchange, bindingContext, providedArgs).subscribeWith(processor);
if (processor.isTerminated()) {

View File

@ -99,7 +99,7 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
String protocol = "echo-v1";
AtomicReference<HandshakeInfo> infoRef = new AtomicReference<>();
MonoProcessor<Object> output = MonoProcessor.fromSink(Sinks.one());
MonoProcessor<Object> output = MonoProcessor.fromSink(Sinks.unsafe().one());
this.client.execute(getUrl("/sub-protocol"),
new WebSocketHandler() {
@ -132,7 +132,7 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
HttpHeaders headers = new HttpHeaders();
headers.add("my-header", "my-value");
MonoProcessor<Object> output = MonoProcessor.fromSink(Sinks.one());
MonoProcessor<Object> output = MonoProcessor.fromSink(Sinks.unsafe().one());
this.client.execute(getUrl("/custom-header"), headers,
session -> session.receive()
@ -148,7 +148,7 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
void sessionClosing(WebSocketClient client, HttpServer server, Class<?> serverConfigClass) throws Exception {
startServer(client, server, serverConfigClass);
MonoProcessor<CloseStatus> statusProcessor = MonoProcessor.fromSink(Sinks.one());
MonoProcessor<CloseStatus> statusProcessor = MonoProcessor.fromSink(Sinks.unsafe().one());
this.client.execute(getUrl("/close"),
session -> {
logger.debug("Starting..");
@ -169,7 +169,7 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
void cookie(WebSocketClient client, HttpServer server, Class<?> serverConfigClass) throws Exception {
startServer(client, server, serverConfigClass);
MonoProcessor<Object> output = MonoProcessor.fromSink(Sinks.one());
MonoProcessor<Object> output = MonoProcessor.fromSink(Sinks.unsafe().one());
AtomicReference<String> cookie = new AtomicReference<>();
this.client.execute(getUrl("/cookie"),
session -> {