diff --git a/spring-core/src/main/java/org/springframework/core/codec/Decoder.java b/spring-core/src/main/java/org/springframework/core/codec/Decoder.java index 51a943e213a..7370a143a67 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/Decoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/Decoder.java @@ -18,12 +18,12 @@ package org.springframework.core.codec; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; -import reactor.core.publisher.Sinks; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; @@ -93,16 +93,21 @@ public interface Decoder { default T decode(DataBuffer buffer, ResolvableType targetType, @Nullable MimeType mimeType, @Nullable Map hints) throws DecodingException { - MonoProcessor processor = MonoProcessor.fromSink(Sinks.one()); - decodeToMono(Mono.just(buffer), targetType, mimeType, hints).subscribeWith(processor); + CompletableFuture future = decodeToMono(Mono.just(buffer), targetType, mimeType, hints).toFuture(); + Assert.state(future.isDone(), "DataBuffer decoding should have completed."); - Assert.state(processor.isTerminated(), "DataBuffer decoding should have completed."); - Throwable ex = processor.getError(); - if (ex != null) { - throw (ex instanceof CodecException ? (CodecException) ex : - new DecodingException("Failed to decode: " + ex.getMessage(), ex)); + Throwable failure; + try { + return future.get(); } - return processor.peek(); + catch (ExecutionException ex) { + failure = ex.getCause(); + } + catch (InterruptedException ex) { + failure = ex; + } + throw (failure instanceof CodecException ? (CodecException) failure : + new DecodingException("Failed to decode: " + failure.getMessage(), failure)); } /** diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/MessagingRSocket.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/MessagingRSocket.java index 4f6e203e163..7c0e8ed5027 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/MessagingRSocket.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/MessagingRSocket.java @@ -27,8 +27,6 @@ import io.rsocket.frame.FrameType; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; -import reactor.core.publisher.Sinks; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; @@ -162,8 +160,9 @@ class MessagingRSocket implements RSocket { ((NettyDataBuffer) dataBuffer).getNativeBuffer().refCnt() : 1; } + @SuppressWarnings("deprecation") private Flux handleAndReply(Payload firstPayload, FrameType frameType, Flux payloads) { - MonoProcessor> replyMono = MonoProcessor.fromSink(Sinks.one()); + reactor.core.publisher.MonoProcessor> replyMono = reactor.core.publisher.MonoProcessor.create(); MessageHeaders headers = createHeaders(firstPayload, frameType, replyMono); AtomicBoolean read = new AtomicBoolean(); @@ -186,8 +185,9 @@ class MessagingRSocket implements RSocket { return PayloadUtils.retainDataAndReleasePayload(payload, this.strategies.dataBufferFactory()); } + @SuppressWarnings("deprecation") private MessageHeaders createHeaders(Payload payload, FrameType frameType, - @Nullable MonoProcessor replyMono) { + @Nullable reactor.core.publisher.MonoProcessor replyMono) { MessageHeaderAccessor headers = new MessageHeaderAccessor(); headers.setLeaveMutable(true); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketPayloadReturnValueHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketPayloadReturnValueHandler.java index 350026b9f80..5dc9b425ab4 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketPayloadReturnValueHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketPayloadReturnValueHandler.java @@ -21,7 +21,6 @@ import java.util.List; import io.rsocket.Payload; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; import org.springframework.core.MethodParameter; import org.springframework.core.ReactiveAdapterRegistry; @@ -36,7 +35,7 @@ import org.springframework.util.Assert; /** * Extension of {@link AbstractEncoderMethodReturnValueHandler} that * {@link #handleEncodedContent handles} encoded content by wrapping data buffers - * as RSocket payloads and by passing those to the {@link MonoProcessor} + * as RSocket payloads and by passing those to the {@link reactor.core.publisher.MonoProcessor} * from the {@link #RESPONSE_HEADER} header. * * @author Rossen Stoyanchev @@ -45,7 +44,7 @@ import org.springframework.util.Assert; public class RSocketPayloadReturnValueHandler extends AbstractEncoderMethodReturnValueHandler { /** - * Message header name that is expected to have a {@link MonoProcessor} + * Message header name that is expected to have a {@link reactor.core.publisher.MonoProcessor} * which will receive the {@code Flux} that represents the response. */ public static final String RESPONSE_HEADER = "rsocketResponse"; @@ -57,11 +56,11 @@ public class RSocketPayloadReturnValueHandler extends AbstractEncoderMethodRetur @Override - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "deprecation"}) protected Mono handleEncodedContent( Flux encodedContent, MethodParameter returnType, Message message) { - MonoProcessor> replyMono = getReplyMono(message); + reactor.core.publisher.MonoProcessor> replyMono = getReplyMono(message); Assert.notNull(replyMono, "Missing '" + RESPONSE_HEADER + "'"); replyMono.onNext(encodedContent.map(PayloadUtils::createPayload)); replyMono.onComplete(); @@ -69,8 +68,9 @@ public class RSocketPayloadReturnValueHandler extends AbstractEncoderMethodRetur } @Override + @SuppressWarnings("deprecation") protected Mono handleNoContent(MethodParameter returnType, Message message) { - MonoProcessor> replyMono = getReplyMono(message); + reactor.core.publisher.MonoProcessor> replyMono = getReplyMono(message); if (replyMono != null) { replyMono.onComplete(); } @@ -78,11 +78,11 @@ public class RSocketPayloadReturnValueHandler extends AbstractEncoderMethodRetur } @Nullable - @SuppressWarnings("unchecked") - private MonoProcessor> getReplyMono(Message message) { + @SuppressWarnings({"unchecked", "deprecation"}) + private reactor.core.publisher.MonoProcessor> getReplyMono(Message message) { Object headerValue = message.getHeaders().get(RESPONSE_HEADER); - Assert.state(headerValue == null || headerValue instanceof MonoProcessor, "Expected MonoProcessor"); - return (MonoProcessor>) headerValue; + Assert.state(headerValue == null || headerValue instanceof reactor.core.publisher.MonoProcessor, "Expected MonoProcessor"); + return (reactor.core.publisher.MonoProcessor>) headerValue; } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 95579c4cb37..0bb08d36ac9 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -19,8 +19,8 @@ package org.springframework.messaging.tcp.reactor; import java.time.Duration; import java.util.Collection; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.function.BiFunction; -import java.util.function.Consumer; import java.util.function.Function; import io.netty.buffer.ByteBuf; @@ -33,7 +33,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.Sinks; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; @@ -53,6 +52,7 @@ import org.springframework.messaging.tcp.TcpConnection; import org.springframework.messaging.tcp.TcpConnectionHandler; import org.springframework.messaging.tcp.TcpOperations; import org.springframework.util.Assert; +import org.springframework.util.concurrent.CompletableToListenableFutureAdapter; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.MonoToListenableFutureAdapter; import org.springframework.util.concurrent.SettableListenableFuture; @@ -205,13 +205,13 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ } // Report first connect to the ListenableFuture - MonoProcessor connectMono = MonoProcessor.fromSink(Sinks.one()); + CompletableFuture connectFuture = new CompletableFuture<>(); this.tcpClient .handle(new ReactorNettyHandler(handler)) .connect() - .doOnNext(updateConnectMono(connectMono)) - .doOnError(updateConnectMono(connectMono)) + .doOnNext(conn -> connectFuture.complete(null)) + .doOnError(connectFuture::completeExceptionally) .doOnError(handler::afterConnectFailure) // report all connect failures to the handler .flatMap(Connection::onDispose) // post-connect issues .retryWhen(Retry.from(signals -> signals @@ -222,7 +222,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ .flatMap(attempt -> reconnect(attempt, strategy))) .subscribe(); - return new MonoToListenableFutureAdapter<>(connectMono); + return new CompletableToListenableFutureAdapter<>(connectFuture); } private ListenableFuture handleShuttingDownConnectFailure(TcpConnectionHandler

handler) { @@ -231,19 +231,6 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ return new MonoToListenableFutureAdapter<>(Mono.error(ex)); } - private Consumer updateConnectMono(MonoProcessor connectMono) { - return o -> { - if (!connectMono.isTerminated()) { - if (o instanceof Throwable) { - connectMono.onError((Throwable) o); - } - else { - connectMono.onComplete(); - } - } - }; - } - private Publisher reconnect(Integer attempt, ReconnectStrategy reconnectStrategy) { Long time = reconnectStrategy.getTimeToNextAttempt(attempt); return (time != null ? Mono.delay(Duration.ofMillis(time), this.scheduler) : Mono.empty()); @@ -316,8 +303,8 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ logger.debug("Connected to " + conn.address()); } }); - MonoProcessor completion = MonoProcessor.fromSink(Sinks.one()); - TcpConnection

connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion); + Sinks.Empty completionSink = Sinks.empty(); + TcpConnection

connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completionSink); scheduler.schedule(() -> this.connectionHandler.afterConnected(connection)); inbound.withConnection(conn -> conn.addHandler(new StompMessageDecoder<>(codec))); @@ -330,7 +317,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ this.connectionHandler::handleFailure, this.connectionHandler::afterConnectionClosed); - return completion; + return completionSink.asMono(); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java index 1442f83a424..3243f6493db 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java @@ -18,7 +18,7 @@ package org.springframework.messaging.tcp.reactor; import io.netty.buffer.ByteBuf; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import reactor.netty.NettyInbound; import reactor.netty.NettyOutbound; @@ -42,16 +42,16 @@ public class ReactorNettyTcpConnection

implements TcpConnection

{ private final ReactorNettyCodec

codec; - private final MonoProcessor closeProcessor; + private final Sinks.Empty completionSink; public ReactorNettyTcpConnection(NettyInbound inbound, NettyOutbound outbound, - ReactorNettyCodec

codec, MonoProcessor closeProcessor) { + ReactorNettyCodec

codec, Sinks.Empty completionSink) { this.inbound = inbound; this.outbound = outbound; this.codec = codec; - this.closeProcessor = closeProcessor; + this.completionSink = completionSink; } @@ -75,7 +75,8 @@ public class ReactorNettyTcpConnection

implements TcpConnection

{ @Override public void close() { - this.closeProcessor.onComplete(); + // Ignore result: can't overflow, ok if not first or no one listens + this.completionSink.tryEmitEmpty(); } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java index 049e23fdd5b..4c1cc61766c 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java @@ -28,7 +28,6 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.Sinks; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; @@ -126,15 +125,15 @@ class RSocketServerToClientIntegrationTests { static class ServerController { // Must be initialized by @Test method... - volatile MonoProcessor result; + volatile Sinks.Empty resultSink; void reset() { - this.result = MonoProcessor.fromSink(Sinks.one()); + this.resultSink = Sinks.empty(); } void await(Duration duration) { - this.result.block(duration); + this.resultSink.asMono().block(duration); } @@ -201,8 +200,8 @@ class RSocketServerToClientIntegrationTests { private void runTest(Runnable testEcho) { Mono.fromRunnable(testEcho) - .doOnError(ex -> result.onError(ex)) - .doOnSuccess(o -> result.onComplete()) + .doOnError(ex -> resultSink.emitError(ex, Sinks.EmitFailureHandler.FAIL_FAST)) + .doOnSuccess(o -> resultSink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST)) .subscribeOn(Schedulers.boundedElastic()) // StepVerifier will block .subscribe(); } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java index 8b9dbd47161..7aa9b218937 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java @@ -34,7 +34,6 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.Sinks; import org.springframework.context.support.StaticApplicationContext; @@ -342,8 +341,8 @@ public class SimpAnnotationMethodMessageHandlerTests { Message message = createMessage("/app1/mono"); this.messageHandler.handleMessage(message); - assertThat(controller.monoProcessor).isNotNull(); - controller.monoProcessor.onNext("foo"); + assertThat(controller.oneSink).isNotNull(); + controller.oneSink.emitValue("foo", Sinks.EmitFailureHandler.FAIL_FAST); verify(this.converter).toMessage(this.payloadCaptor.capture(), any(MessageHeaders.class)); assertThat(this.payloadCaptor.getValue()).isEqualTo("foo"); } @@ -357,7 +356,7 @@ public class SimpAnnotationMethodMessageHandlerTests { Message message = createMessage("/app1/mono"); this.messageHandler.handleMessage(message); - controller.monoProcessor.onError(new IllegalStateException()); + controller.oneSink.emitError(new IllegalStateException(), Sinks.EmitFailureHandler.FAIL_FAST); assertThat(controller.exceptionCaught).isTrue(); } @@ -370,8 +369,8 @@ public class SimpAnnotationMethodMessageHandlerTests { Message message = createMessage("/app1/flux"); this.messageHandler.handleMessage(message); - assertThat(controller.fluxSink).isNotNull(); - controller.fluxSink.tryEmitNext("foo"); + assertThat(controller.manySink).isNotNull(); + controller.manySink.tryEmitNext("foo"); verify(this.converter, never()).toMessage(any(), any(MessageHeaders.class)); } @@ -585,22 +584,22 @@ public class SimpAnnotationMethodMessageHandlerTests { @Controller private static class ReactiveController { - private MonoProcessor monoProcessor; + private Sinks.One oneSink; - private Sinks.Many fluxSink; + private Sinks.Many manySink; private boolean exceptionCaught = false; @MessageMapping("mono") public Mono handleMono() { - this.monoProcessor = MonoProcessor.fromSink(Sinks.one()); - return this.monoProcessor; + this.oneSink = Sinks.one(); + return this.oneSink.asMono(); } @MessageMapping("flux") public Flux handleFlux() { - this.fluxSink = Sinks.many().unicast().onBackpressureBuffer(); - return this.fluxSink.asFlux(); + this.manySink = Sinks.many().unicast().onBackpressureBuffer(); + return this.manySink.asFlux(); } @MessageExceptionHandler(IllegalStateException.class) diff --git a/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java b/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java index f52e8668fa5..6a2c588d0af 100644 --- a/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java +++ b/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java @@ -25,7 +25,6 @@ import java.util.function.Function; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.Sinks; import org.springframework.core.io.buffer.DataBuffer; @@ -64,11 +63,14 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse { public MockServerHttpResponse(DataBufferFactory dataBufferFactory) { super(dataBufferFactory); this.writeHandler = body -> { - // Avoid .then() which causes data buffers to be released - MonoProcessor completion = MonoProcessor.fromSink(Sinks.one()); - this.body = body.doOnComplete(completion::onComplete).doOnError(completion::onError).cache(); + // Avoid .then() that causes data buffers to be discarded and released + Sinks.Empty completion = Sinks.unsafe().empty(); + this.body = body + .doOnComplete(completion::tryEmitEmpty) // Ignore error: cached + serialized + .doOnError(completion::tryEmitError) + .cache(); this.body.subscribe(); - return completion; + return completion.asMono(); }; } diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java index 63930333394..23b05bcb27d 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java @@ -24,7 +24,6 @@ import org.apache.commons.logging.LogFactory; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.Sinks; import reactor.core.scheduler.Schedulers; @@ -84,8 +83,8 @@ public class HttpHandlerConnector implements ClientHttpConnector { private Mono doConnect( HttpMethod httpMethod, URI uri, Function> requestCallback) { - MonoProcessor requestWriteCompletion = MonoProcessor.fromSink(Sinks.one()); - MonoProcessor handlerCompletion = MonoProcessor.fromSink(Sinks.one()); + Sinks.Empty requestWriteCompletion = Sinks.empty(); + Sinks.Empty handlerCompletion = Sinks.empty(); ClientHttpResponse[] savedResponse = new ClientHttpResponse[1]; MockClientHttpRequest mockClientRequest = new MockClientHttpRequest(httpMethod, uri); @@ -95,7 +94,10 @@ public class HttpHandlerConnector implements ClientHttpConnector { log("Invoking HttpHandler for ", httpMethod, uri); ServerHttpRequest mockServerRequest = adaptRequest(mockClientRequest, requestBody); ServerHttpResponse responseToUse = prepareResponse(mockServerResponse, mockServerRequest); - this.handler.handle(mockServerRequest, responseToUse).subscribe(handlerCompletion); + this.handler.handle(mockServerRequest, responseToUse).subscribe( + aVoid -> {}, + handlerCompletion::tryEmitError, // Ignore error: cached + serialized + handlerCompletion::tryEmitEmpty); return Mono.empty(); }); @@ -106,9 +108,12 @@ public class HttpHandlerConnector implements ClientHttpConnector { })); log("Writing client request for ", httpMethod, uri); - requestCallback.apply(mockClientRequest).subscribe(requestWriteCompletion); + requestCallback.apply(mockClientRequest).subscribe( + aVoid -> {}, + requestWriteCompletion::tryEmitError, // Ignore error: cached + serialized + requestWriteCompletion::tryEmitEmpty); - return Mono.when(requestWriteCompletion, handlerCompletion) + return Mono.when(requestWriteCompletion.asMono(), handlerCompletion.asMono()) .onErrorMap(ex -> { ClientHttpResponse response = savedResponse[0]; return response != null ? new FailureAfterResponseCompletedException(response, ex) : ex; diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java index 76329b6a6b0..a492c6a03ec 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java @@ -24,9 +24,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import org.reactivestreams.Publisher; +import reactor.core.Scannable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.Sinks; import org.springframework.core.io.buffer.DataBuffer; @@ -138,7 +138,8 @@ class WiretapConnector implements ClientHttpConnector { private final DataBuffer buffer = DefaultDataBufferFactory.sharedInstance.allocateBuffer(); - private final MonoProcessor content = MonoProcessor.fromSink(Sinks.one()); + // unsafe(): we're intercepting, already serialized Publisher signals + private final Sinks.One content = Sinks.unsafe().one(); private boolean hasContentConsumer; @@ -167,7 +168,8 @@ class WiretapConnector implements ClientHttpConnector { .doOnComplete(this::handleOnComplete) : null; if (publisher == null && publisherNested == null) { - this.content.onComplete(); + // Ignore result: OK or not relevant + this.content.tryEmitEmpty(); } } @@ -184,8 +186,8 @@ class WiretapConnector implements ClientHttpConnector { public Mono getContent() { return Mono.defer(() -> { - if (this.content.isTerminated()) { - return this.content; + if (this.content.scan(Scannable.Attr.TERMINATED) == Boolean.TRUE) { + return this.content.asMono(); } if (!this.hasContentConsumer) { // Couple of possible cases: @@ -198,23 +200,21 @@ class WiretapConnector implements ClientHttpConnector { "an error was raised while attempting to produce it.", ex)) .subscribe(); } - return this.content; + return this.content.asMono(); }); } private void handleOnError(Throwable ex) { - if (!this.content.isTerminated()) { - this.content.onError(ex); - } + // Ignore result: OK or not relevant + this.content.tryEmitError(ex); } private void handleOnComplete() { - if (!this.content.isTerminated()) { - byte[] bytes = new byte[this.buffer.readableByteCount()]; - this.buffer.read(bytes); - this.content.onNext(bytes); - } + byte[] bytes = new byte[this.buffer.readableByteCount()]; + this.buffer.read(bytes); + // Ignore result: OK or not relevant + this.content.tryEmitValue(bytes); } } diff --git a/spring-test/src/test/java/org/springframework/test/web/reactive/server/CookieAssertionTests.java b/spring-test/src/test/java/org/springframework/test/web/reactive/server/CookieAssertionTests.java index 426dc7b3445..70a823302fb 100644 --- a/spring-test/src/test/java/org/springframework/test/web/reactive/server/CookieAssertionTests.java +++ b/spring-test/src/test/java/org/springframework/test/web/reactive/server/CookieAssertionTests.java @@ -19,8 +19,7 @@ import java.net.URI; import java.time.Duration; import org.junit.jupiter.api.Test; -import reactor.core.publisher.MonoProcessor; -import reactor.core.publisher.Sinks; +import reactor.core.publisher.Mono; import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; @@ -128,10 +127,9 @@ public class CookieAssertionTests { MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK); response.getCookies().add(cookie.getName(), cookie); - MonoProcessor emptyContent = MonoProcessor.fromSink(Sinks.one()); - emptyContent.onComplete(); + ExchangeResult result = new ExchangeResult( + request, response, Mono.empty(), Mono.empty(), Duration.ZERO, null, null); - ExchangeResult result = new ExchangeResult(request, response, emptyContent, emptyContent, Duration.ZERO, null, null); return new CookieAssertions(result, mock(WebTestClient.ResponseSpec.class)); } diff --git a/spring-test/src/test/java/org/springframework/test/web/reactive/server/HeaderAssertionTests.java b/spring-test/src/test/java/org/springframework/test/web/reactive/server/HeaderAssertionTests.java index dd5cebe838f..b1e6ced4b18 100644 --- a/spring-test/src/test/java/org/springframework/test/web/reactive/server/HeaderAssertionTests.java +++ b/spring-test/src/test/java/org/springframework/test/web/reactive/server/HeaderAssertionTests.java @@ -23,8 +23,7 @@ import java.time.ZonedDateTime; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; -import reactor.core.publisher.MonoProcessor; -import reactor.core.publisher.Sinks; +import reactor.core.publisher.Mono; import org.springframework.http.CacheControl; import org.springframework.http.HttpHeaders; @@ -241,10 +240,9 @@ class HeaderAssertionTests { MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK); response.getHeaders().putAll(responseHeaders); - MonoProcessor emptyContent = MonoProcessor.fromSink(Sinks.one()); - emptyContent.onComplete(); + ExchangeResult result = new ExchangeResult( + request, response, Mono.empty(), Mono.empty(), Duration.ZERO, null, null); - ExchangeResult result = new ExchangeResult(request, response, emptyContent, emptyContent, Duration.ZERO, null, null); return new HeaderAssertions(result, mock(WebTestClient.ResponseSpec.class)); } diff --git a/spring-test/src/test/java/org/springframework/test/web/reactive/server/StatusAssertionTests.java b/spring-test/src/test/java/org/springframework/test/web/reactive/server/StatusAssertionTests.java index c9fd9b68d06..7a301ad9161 100644 --- a/spring-test/src/test/java/org/springframework/test/web/reactive/server/StatusAssertionTests.java +++ b/spring-test/src/test/java/org/springframework/test/web/reactive/server/StatusAssertionTests.java @@ -20,8 +20,7 @@ import java.net.URI; import java.time.Duration; import org.junit.jupiter.api.Test; -import reactor.core.publisher.MonoProcessor; -import reactor.core.publisher.Sinks; +import reactor.core.publisher.Mono; import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; @@ -157,10 +156,9 @@ class StatusAssertionTests { MockClientHttpRequest request = new MockClientHttpRequest(HttpMethod.GET, URI.create("/")); MockClientHttpResponse response = new MockClientHttpResponse(status); - MonoProcessor emptyContent = MonoProcessor.fromSink(Sinks.one()); - emptyContent.onComplete(); + ExchangeResult result = new ExchangeResult( + request, response, Mono.empty(), Mono.empty(), Duration.ZERO, null, null); - ExchangeResult result = new ExchangeResult(request, response, emptyContent, emptyContent, Duration.ZERO, null, null); return new StatusAssertions(result, mock(WebTestClient.ResponseSpec.class)); } diff --git a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/MockServerHttpResponse.java b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/MockServerHttpResponse.java index 6b69119ea38..6d6866fdb34 100644 --- a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/MockServerHttpResponse.java +++ b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/MockServerHttpResponse.java @@ -25,7 +25,6 @@ import java.util.function.Function; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.Sinks; import org.springframework.core.io.buffer.DataBuffer; @@ -64,11 +63,14 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse { public MockServerHttpResponse(DataBufferFactory dataBufferFactory) { super(dataBufferFactory); this.writeHandler = body -> { - // Avoid .then() which causes data buffers to be released - MonoProcessor completion = MonoProcessor.fromSink(Sinks.one()); - this.body = body.doOnComplete(completion::onComplete).doOnError(completion::onError).cache(); + // Avoid .then() that causes data buffers to be discarded and released + Sinks.Empty completion = Sinks.unsafe().empty(); + this.body = body + .doOnComplete(completion::tryEmitEmpty) // Ignore error: cached + serialized + .doOnError(completion::tryEmitError) + .cache(); this.body.subscribe(); - return completion; + return completion.asMono(); }; } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/SyncInvocableHandlerMethod.java b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/SyncInvocableHandlerMethod.java index ece6a29b3fd..8822bf59dfb 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/SyncInvocableHandlerMethod.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/SyncInvocableHandlerMethod.java @@ -18,11 +18,10 @@ package org.springframework.web.reactive.result.method; import java.lang.reflect.Method; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; -import reactor.core.publisher.MonoProcessor; -import reactor.core.publisher.Sinks; - import org.springframework.core.DefaultParameterNameDiscoverer; import org.springframework.core.ParameterNameDiscoverer; import org.springframework.lang.Nullable; @@ -102,22 +101,26 @@ public class SyncInvocableHandlerMethod extends HandlerMethod { public HandlerResult invokeForHandlerResult(ServerWebExchange exchange, BindingContext bindingContext, Object... providedArgs) { - MonoProcessor processor = MonoProcessor.fromSink(Sinks.unsafe().one()); - this.delegate.invoke(exchange, bindingContext, providedArgs).subscribeWith(processor); + CompletableFuture future = + this.delegate.invoke(exchange, bindingContext, providedArgs).toFuture(); - if (processor.isTerminated()) { - Throwable ex = processor.getError(); - if (ex != null) { - throw (ex instanceof ServerErrorException ? (ServerErrorException) ex : - new ServerErrorException("Failed to invoke: " + getShortLogMessage(), getMethod(), ex)); - } - return processor.peek(); - } - else { - // Should never happen... + if (!future.isDone()) { throw new IllegalStateException( "SyncInvocableHandlerMethod should have completed synchronously."); } + + Throwable failure; + try { + return future.get(); + } + catch (ExecutionException ex) { + failure = ex.getCause(); + } + catch (InterruptedException ex) { + failure = ex; + } + throw (new ServerErrorException( + "Failed to invoke: " + getShortLogMessage(), getMethod(), failure)); } } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ErrorsMethodArgumentResolver.java b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ErrorsMethodArgumentResolver.java index d01f173b4c6..da756a45b8b 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ErrorsMethodArgumentResolver.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ErrorsMethodArgumentResolver.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,6 +56,11 @@ public class ErrorsMethodArgumentResolver extends HandlerMethodArgumentResolverS MethodParameter parameter, BindingContext context, ServerWebExchange exchange) { Object errors = getErrors(parameter, context); + + // Initially Errors/BindingResult is a Mono in the model even if it cannot be declared + // as an async argument. That way it can be resolved first while the Mono can complete + // later at which point the model is also updated for further use. + if (Mono.class.isAssignableFrom(errors.getClass())) { return ((Mono) errors).cast(Object.class); } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java index 3375e264b91..ce9d92bd050 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java @@ -23,7 +23,6 @@ import java.util.Map; import java.util.Optional; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.Sinks; import org.springframework.beans.BeanUtils; @@ -111,20 +110,23 @@ public class ModelAttributeMethodArgumentResolver extends HandlerMethodArgumentR String name = ModelInitializer.getNameForParameter(parameter); Mono valueMono = prepareAttributeMono(name, valueType, context, exchange); + // unsafe(): we're intercepting, already serialized Publisher signals + Sinks.One bindingResultSink = Sinks.unsafe().one(); + Map model = context.getModel().asMap(); - MonoProcessor bindingResultMono = MonoProcessor.fromSink(Sinks.one()); - model.put(BindingResult.MODEL_KEY_PREFIX + name, bindingResultMono); + model.put(BindingResult.MODEL_KEY_PREFIX + name, bindingResultSink.asMono()); return valueMono.flatMap(value -> { WebExchangeDataBinder binder = context.createDataBinder(exchange, value, name); return bindRequestParameters(binder, exchange) - .doOnError(bindingResultMono::onError) + .doOnError(ex -> bindingResultSink.emitError(ex, Sinks.EmitFailureHandler.FAIL_FAST)) .doOnSuccess(aVoid -> { validateIfApplicable(binder, parameter); - BindingResult errors = binder.getBindingResult(); - model.put(BindingResult.MODEL_KEY_PREFIX + name, errors); + BindingResult bindingResult = binder.getBindingResult(); + model.put(BindingResult.MODEL_KEY_PREFIX + name, bindingResult); model.put(name, value); - bindingResultMono.onNext(errors); + // serialized and buffered (should never fail) + bindingResultSink.tryEmitValue(bindingResult); }) .then(Mono.fromCallable(() -> { BindingResult errors = binder.getBindingResult(); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java index 3cef0fbcf23..efab5724027 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java @@ -25,7 +25,6 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.Sinks; import reactor.util.concurrent.Queues; @@ -65,7 +64,11 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc @Nullable - private final MonoProcessor handlerCompletion; + private final Sinks.Empty handlerCompletionSink; + + @Nullable + @SuppressWarnings("deprecation") + private final reactor.core.publisher.MonoProcessor handlerCompletionMono; private final WebSocketReceivePublisher receivePublisher; @@ -74,33 +77,53 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc private final AtomicBoolean sendCalled = new AtomicBoolean(); - private final MonoProcessor closeStatusProcessor = MonoProcessor.fromSink(Sinks.one()); + private final Sinks.One closeStatusSink = Sinks.one(); /** * Base constructor. * @param delegate the native WebSocket session, channel, or connection * @param id the session id - * @param handshakeInfo the handshake info + * @param info the handshake info * @param bufferFactory the DataBuffer factor for the current connection */ public AbstractListenerWebSocketSession( - T delegate, String id, HandshakeInfo handshakeInfo, DataBufferFactory bufferFactory) { + T delegate, String id, HandshakeInfo info, DataBufferFactory bufferFactory) { - this(delegate, id, handshakeInfo, bufferFactory, null); + this(delegate, id, info, bufferFactory, (Sinks.Empty) null); } /** - * Alternative constructor with completion {@code Mono} to propagate - * session completion (success or error). This is primarily for use with the - * {@code WebSocketClient} to be able to report the end of execution. + * Alternative constructor with completion sink to use to signal when the + * handling of the session is complete, with success or error. + *

Primarily for use with {@code WebSocketClient} to be able to + * communicate the end of handling. */ public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo info, - DataBufferFactory bufferFactory, @Nullable MonoProcessor handlerCompletion) { + DataBufferFactory bufferFactory, @Nullable Sinks.Empty handlerCompletionSink) { super(delegate, id, info, bufferFactory); this.receivePublisher = new WebSocketReceivePublisher(); - this.handlerCompletion = handlerCompletion; + this.handlerCompletionSink = handlerCompletionSink; + this.handlerCompletionMono = null; + } + + /** + * Alternative constructor with completion MonoProcessor to use to signal + * when the handling of the session is complete, with success or error. + *

Primarily for use with {@code WebSocketClient} to be able to + * communicate the end of handling. + * @deprecated as of 5.3 in favor of + * {@link #AbstractListenerWebSocketSession(Object, String, HandshakeInfo, DataBufferFactory, Sinks.Empty)} + */ + @Deprecated + public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo info, + DataBufferFactory bufferFactory, @Nullable reactor.core.publisher.MonoProcessor handlerCompletion) { + + super(delegate, id, info, bufferFactory); + this.receivePublisher = new WebSocketReceivePublisher(); + this.handlerCompletionMono = handlerCompletion; + this.handlerCompletionSink = null; } @@ -133,7 +156,7 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc @Override public Mono closeStatus() { - return this.closeStatusProcessor; + return this.closeStatusSink.asMono(); } /** @@ -178,9 +201,10 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc this.receivePublisher.handleMessage(message); } - /** Handle an error callback from the WebSocketHandler adapter. */ + /** Handle an error callback from the WebSocket engine. */ void handleError(Throwable ex) { - this.closeStatusProcessor.onComplete(); + // Ignore result: can't overflow, ok if not first or no one listens + this.closeStatusSink.tryEmitEmpty(); this.receivePublisher.onError(ex); WebSocketSendProcessor sendProcessor = this.sendProcessor; if (sendProcessor != null) { @@ -189,9 +213,10 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc } } - /** Handle a close callback from the WebSocketHandler adapter. */ + /** Handle a close callback from the WebSocket engine. */ void handleClose(CloseStatus closeStatus) { - this.closeStatusProcessor.onNext(closeStatus); + // Ignore result: can't overflow, ok if not first or no one listens + this.closeStatusSink.tryEmitValue(closeStatus); this.receivePublisher.onAllDataRead(); WebSocketSendProcessor sendProcessor = this.sendProcessor; if (sendProcessor != null) { @@ -215,16 +240,24 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc @Override public void onError(Throwable ex) { - if (this.handlerCompletion != null) { - this.handlerCompletion.onError(ex); + if (this.handlerCompletionSink != null) { + // Ignore result: can't overflow, ok if not first or no one listens + this.handlerCompletionSink.tryEmitError(ex); + } + if (this.handlerCompletionMono != null) { + this.handlerCompletionMono.onError(ex); } close(CloseStatus.SERVER_ERROR.withReason(ex.getMessage())); } @Override public void onComplete() { - if (this.handlerCompletion != null) { - this.handlerCompletion.onComplete(); + if (this.handlerCompletionSink != null) { + // Ignore result: can't overflow, ok if not first or no one listens + this.handlerCompletionSink.tryEmitEmpty(); + } + if (this.handlerCompletionMono != null) { + this.handlerCompletionMono.onComplete(); } close(); } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java index 187d606de50..e39d48a39d4 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,7 @@ import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.SuspendToken; import org.eclipse.jetty.websocket.api.WriteCallback; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.lang.Nullable; @@ -50,17 +50,24 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession) null); } public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory, - @Nullable MonoProcessor completionMono) { + @Nullable Sinks.Empty completionSink) { - super(session, ObjectUtils.getIdentityHexString(session), info, factory, completionMono); + super(session, ObjectUtils.getIdentityHexString(session), info, factory, completionSink); // TODO: suspend causes failures if invoked at this stage // suspendReceiving(); } + @Deprecated + public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory, + @Nullable reactor.core.publisher.MonoProcessor completionMono) { + + super(session, ObjectUtils.getIdentityHexString(session), info, factory, completionMono); + } + @Override protected boolean canSuspendReceiving() { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java index 52ab788fe1f..7c84f2bc9df 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,7 +27,7 @@ import javax.websocket.SendResult; import javax.websocket.Session; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.lang.Nullable; @@ -47,11 +47,18 @@ import org.springframework.web.reactive.socket.WebSocketSession; public class StandardWebSocketSession extends AbstractListenerWebSocketSession { public StandardWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory) { - this(session, info, factory, null); + this(session, info, factory, (Sinks.Empty) null); } public StandardWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory, - @Nullable MonoProcessor completionMono) { + @Nullable Sinks.Empty completionSink) { + + super(session, session.getId(), info, factory, completionSink); + } + + @Deprecated + public StandardWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory, + @Nullable reactor.core.publisher.MonoProcessor completionMono) { super(session, session.getId(), info, factory, completionMono); } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java index 6fc65bd47d5..48e78ccedcd 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java @@ -21,7 +21,6 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import javax.websocket.Session; import org.apache.tomcat.websocket.WsSession; -import reactor.core.publisher.MonoProcessor; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.web.reactive.socket.HandshakeInfo; @@ -47,8 +46,9 @@ public class TomcatWebSocketSession extends StandardWebSocketSession { super(session, info, factory); } + @SuppressWarnings("deprecation") public TomcatWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory, - MonoProcessor completionMono) { + reactor.core.publisher.MonoProcessor completionMono) { super(session, info, factory, completionMono); suspendReceiving(); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java index 537fd47d725..ea88778e4aa 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ import io.undertow.websockets.core.WebSocketCallback; import io.undertow.websockets.core.WebSocketChannel; import io.undertow.websockets.core.WebSockets; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; @@ -48,11 +48,19 @@ import org.springframework.web.reactive.socket.WebSocketSession; public class UndertowWebSocketSession extends AbstractListenerWebSocketSession { public UndertowWebSocketSession(WebSocketChannel channel, HandshakeInfo info, DataBufferFactory factory) { - this(channel, info, factory, null); + this(channel, info, factory, (Sinks.Empty) null); } public UndertowWebSocketSession(WebSocketChannel channel, HandshakeInfo info, - DataBufferFactory factory, @Nullable MonoProcessor completionMono) { + DataBufferFactory factory, @Nullable Sinks.Empty completionSink) { + + super(channel, ObjectUtils.getIdentityHexString(channel), info, factory, completionSink); + suspendReceiving(); + } + + @Deprecated + public UndertowWebSocketSession(WebSocketChannel channel, HandshakeInfo info, + DataBufferFactory factory, @Nullable reactor.core.publisher.MonoProcessor completionMono) { super(channel, ObjectUtils.getIdentityHexString(channel), info, factory, completionMono); suspendReceiving(); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java index ee18a456bf4..0dbe7b0ba16 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java @@ -26,7 +26,6 @@ import org.eclipse.jetty.websocket.api.UpgradeResponse; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.io.UpgradeListener; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.Sinks; import org.springframework.context.Lifecycle; @@ -137,26 +136,25 @@ public class JettyWebSocketClient implements WebSocketClient, Lifecycle { } private Mono executeInternal(URI url, HttpHeaders headers, WebSocketHandler handler) { - MonoProcessor completionMono = MonoProcessor.fromSink(Sinks.one()); + Sinks.Empty completionSink = Sinks.empty(); return Mono.fromCallable( () -> { if (logger.isDebugEnabled()) { logger.debug("Connecting to " + url); } - Object jettyHandler = createHandler(url, handler, completionMono); + Object jettyHandler = createHandler(url, handler, completionSink); ClientUpgradeRequest request = new ClientUpgradeRequest(); request.setSubProtocols(handler.getSubProtocols()); UpgradeListener upgradeListener = new DefaultUpgradeListener(headers); return this.jettyClient.connect(jettyHandler, url, request, upgradeListener); }) - .then(completionMono); + .then(completionSink.asMono()); } - private Object createHandler(URI url, WebSocketHandler handler, MonoProcessor completion) { + private Object createHandler(URI url, WebSocketHandler handler, Sinks.Empty completion) { return new JettyWebSocketHandlerAdapter(handler, session -> { HandshakeInfo info = createHandshakeInfo(url, session); - return new JettyWebSocketSession( - session, info, DefaultDataBufferFactory.sharedInstance, completion); + return new JettyWebSocketSession(session, info, DefaultDataBufferFactory.sharedInstance, completion); }); } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java index b46b30e0594..7d26e096bad 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java @@ -31,7 +31,6 @@ import javax.websocket.WebSocketContainer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.Sinks; import reactor.core.scheduler.Schedulers; @@ -96,7 +95,7 @@ public class StandardWebSocketClient implements WebSocketClient { } private Mono executeInternal(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) { - MonoProcessor completionMono = MonoProcessor.fromSink(Sinks.one()); + Sinks.Empty completionSink = Sinks.empty(); return Mono.fromCallable( () -> { if (logger.isDebugEnabled()) { @@ -104,16 +103,16 @@ public class StandardWebSocketClient implements WebSocketClient { } List protocols = handler.getSubProtocols(); DefaultConfigurator configurator = new DefaultConfigurator(requestHeaders); - Endpoint endpoint = createEndpoint(url, handler, completionMono, configurator); + Endpoint endpoint = createEndpoint(url, handler, completionSink, configurator); ClientEndpointConfig config = createEndpointConfig(configurator, protocols); return this.webSocketContainer.connectToServer(endpoint, config, url); }) .subscribeOn(Schedulers.boundedElastic()) // connectToServer is blocking - .then(completionMono); + .then(completionSink.asMono()); } private StandardWebSocketHandlerAdapter createEndpoint(URI url, WebSocketHandler handler, - MonoProcessor completion, DefaultConfigurator configurator) { + Sinks.Empty completion, DefaultConfigurator configurator) { return new StandardWebSocketHandlerAdapter(handler, session -> createWebSocketSession(session, createHandshakeInfo(url, configurator), completion)); @@ -126,9 +125,18 @@ public class StandardWebSocketClient implements WebSocketClient { } protected StandardWebSocketSession createWebSocketSession(Session session, HandshakeInfo info, - MonoProcessor completion) { + Sinks.Empty completionSink) { - return new StandardWebSocketSession(session, info, DefaultDataBufferFactory.sharedInstance, completion); + return new StandardWebSocketSession( + session, info, DefaultDataBufferFactory.sharedInstance, completionSink); + } + + @Deprecated + protected StandardWebSocketSession createWebSocketSession(Session session, HandshakeInfo info, + reactor.core.publisher.MonoProcessor completionMono) { + + return new StandardWebSocketSession( + session, info, DefaultDataBufferFactory.sharedInstance, completionMono); } private ClientEndpointConfig createEndpointConfig(Configurator configurator, List subProtocols) { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/TomcatWebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/TomcatWebSocketClient.java index 3980cf1a0a4..10e60cd2de8 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/TomcatWebSocketClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/TomcatWebSocketClient.java @@ -20,7 +20,6 @@ import javax.websocket.Session; import javax.websocket.WebSocketContainer; import org.apache.tomcat.websocket.WsWebSocketContainer; -import reactor.core.publisher.MonoProcessor; import org.springframework.web.reactive.socket.HandshakeInfo; import org.springframework.web.reactive.socket.adapter.StandardWebSocketSession; @@ -45,10 +44,11 @@ public class TomcatWebSocketClient extends StandardWebSocketClient { @Override + @SuppressWarnings("deprecation") protected StandardWebSocketSession createWebSocketSession(Session session, - HandshakeInfo info, MonoProcessor completion) { + HandshakeInfo info, reactor.core.publisher.MonoProcessor completionMono) { - return new TomcatWebSocketSession(session, info, bufferFactory(), completion); + return new TomcatWebSocketSession(session, info, bufferFactory(), completionMono); } } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java index b83c73f4bf9..03563efdc80 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java @@ -33,7 +33,6 @@ import org.apache.commons.logging.LogFactory; import org.xnio.IoFuture; import org.xnio.XnioWorker; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.Sinks; import org.springframework.core.io.buffer.DataBufferFactory; @@ -155,7 +154,7 @@ public class UndertowWebSocketClient implements WebSocketClient { } private Mono executeInternal(URI url, HttpHeaders headers, WebSocketHandler handler) { - MonoProcessor completion = MonoProcessor.fromSink(Sinks.one()); + Sinks.Empty completionSink = Sinks.empty(); return Mono.fromCallable( () -> { if (logger.isDebugEnabled()) { @@ -169,15 +168,17 @@ public class UndertowWebSocketClient implements WebSocketClient { new IoFuture.HandlingNotifier() { @Override public void handleDone(WebSocketChannel channel, Object attachment) { - handleChannel(url, handler, completion, negotiation, channel); + handleChannel(url, handler, completionSink, negotiation, channel); } @Override public void handleFailed(IOException ex, Object attachment) { - completion.onError(new IllegalStateException("Failed to connect to " + url, ex)); + // Ignore result: can't overflow, ok if not first or no one listens + completionSink.tryEmitError( + new IllegalStateException("Failed to connect to " + url, ex)); } }, null); }) - .then(completion); + .then(completionSink.asMono()); } /** @@ -194,12 +195,12 @@ public class UndertowWebSocketClient implements WebSocketClient { return builder; } - private void handleChannel(URI url, WebSocketHandler handler, MonoProcessor completion, + private void handleChannel(URI url, WebSocketHandler handler, Sinks.Empty completionSink, DefaultNegotiation negotiation, WebSocketChannel channel) { HandshakeInfo info = createHandshakeInfo(url, negotiation); DataBufferFactory bufferFactory = DefaultDataBufferFactory.sharedInstance; - UndertowWebSocketSession session = new UndertowWebSocketSession(channel, info, bufferFactory, completion); + UndertowWebSocketSession session = new UndertowWebSocketSession(channel, info, bufferFactory, completionSink); UndertowWebSocketHandlerAdapter adapter = new UndertowWebSocketHandlerAdapter(session); channel.getReceiveSetter().set(adapter); diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ErrorsMethodArgumentResolverTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ErrorsMethodArgumentResolverTests.java index 1fcba691ab6..38467b71078 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ErrorsMethodArgumentResolverTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ErrorsMethodArgumentResolverTests.java @@ -20,8 +20,6 @@ import java.time.Duration; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; -import reactor.core.publisher.Sinks; import org.springframework.core.MethodParameter; import org.springframework.core.ReactiveAdapterRegistry; @@ -91,9 +89,7 @@ class ErrorsMethodArgumentResolverTests { @Test void resolveWithMono() { BindingResult bindingResult = createBindingResult(new Foo(), "foo"); - MonoProcessor monoProcessor = MonoProcessor.fromSink(Sinks.one()); - monoProcessor.onNext(bindingResult); - this.bindingContext.getModel().asMap().put(BindingResult.MODEL_KEY_PREFIX + "foo", monoProcessor); + this.bindingContext.getModel().asMap().put(BindingResult.MODEL_KEY_PREFIX + "foo", Mono.just(bindingResult)); MethodParameter parameter = this.testMethod.arg(Errors.class); Object actual = this.resolver.resolveArgument(parameter, this.bindingContext, this.exchange) diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java index 17f0eaec37c..7cc9a20e0c0 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java @@ -26,7 +26,7 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import reactor.core.publisher.Flux; -import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.test.StepVerifier; @@ -224,7 +224,9 @@ class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { private static final Flux INTERVAL = testInterval(Duration.ofMillis(100), 50); - private MonoProcessor cancellation = MonoProcessor.fromSink(Sinks.one()); + private final Sinks.Empty cancelSink = Sinks.empty(); + + private Mono cancellation = cancelSink.asMono(); @GetMapping("/string") @@ -250,7 +252,7 @@ class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { Flux infinite() { return Flux.just(0, 1).map(l -> "foo " + l) .mergeWith(Flux.never()) - .doOnCancel(() -> cancellation.onComplete()); + .doOnCancel(() -> cancelSink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST)); } } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java index be5471c6a1c..072cfddd230 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java @@ -27,8 +27,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; -import reactor.core.publisher.Sinks; import reactor.util.retry.Retry; import org.springframework.context.annotation.Bean; @@ -99,7 +97,7 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests { String protocol = "echo-v1"; AtomicReference infoRef = new AtomicReference<>(); - MonoProcessor output = MonoProcessor.fromSink(Sinks.unsafe().one()); + AtomicReference protocolRef = new AtomicReference<>(); this.client.execute(getUrl("/sub-protocol"), new WebSocketHandler() { @@ -113,7 +111,8 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests { infoRef.set(session.getHandshakeInfo()); return session.receive() .map(WebSocketMessage::getPayloadAsText) - .subscribeWith(output) + .doOnNext(protocolRef::set) + .doOnError(protocolRef::set) .then(); } }) @@ -123,7 +122,7 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests { assertThat(info.getHeaders().getFirst("Upgrade")).isEqualToIgnoringCase("websocket"); assertThat(info.getHeaders().getFirst("Sec-WebSocket-Protocol")).isEqualTo(protocol); assertThat(info.getSubProtocol()).as("Wrong protocol accepted").isEqualTo(protocol); - assertThat(output.block(TIMEOUT)).as("Wrong protocol detected on the server side").isEqualTo(protocol); + assertThat(protocolRef.get()).as("Wrong protocol detected on the server side").isEqualTo(protocol); } @ParameterizedWebSocketTest @@ -132,27 +131,28 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests { HttpHeaders headers = new HttpHeaders(); headers.add("my-header", "my-value"); - MonoProcessor output = MonoProcessor.fromSink(Sinks.unsafe().one()); + AtomicReference headerRef = new AtomicReference<>(); this.client.execute(getUrl("/custom-header"), headers, session -> session.receive() .map(WebSocketMessage::getPayloadAsText) - .subscribeWith(output) + .doOnNext(headerRef::set) + .doOnError(headerRef::set) .then()) .block(TIMEOUT); - assertThat(output.block(TIMEOUT)).isEqualTo("my-header:my-value"); + assertThat(headerRef.get()).isEqualTo("my-header:my-value"); } @ParameterizedWebSocketTest void sessionClosing(WebSocketClient client, HttpServer server, Class serverConfigClass) throws Exception { startServer(client, server, serverConfigClass); - MonoProcessor statusProcessor = MonoProcessor.fromSink(Sinks.unsafe().one()); + AtomicReference statusRef = new AtomicReference<>(); this.client.execute(getUrl("/close"), session -> { logger.debug("Starting.."); - session.closeStatus().subscribe(statusProcessor); + session.closeStatus().subscribe(statusRef::set, statusRef::set, () -> {}); return session.receive() .doOnNext(s -> logger.debug("inbound " + s)) .then() @@ -162,25 +162,26 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests { }) .block(TIMEOUT); - assertThat(statusProcessor.block()).isEqualTo(CloseStatus.GOING_AWAY); + assertThat(statusRef.get()).isEqualTo(CloseStatus.GOING_AWAY); } @ParameterizedWebSocketTest void cookie(WebSocketClient client, HttpServer server, Class serverConfigClass) throws Exception { startServer(client, server, serverConfigClass); - MonoProcessor output = MonoProcessor.fromSink(Sinks.unsafe().one()); AtomicReference cookie = new AtomicReference<>(); + AtomicReference receivedCookieRef = new AtomicReference<>(); this.client.execute(getUrl("/cookie"), session -> { cookie.set(session.getHandshakeInfo().getHeaders().getFirst("Set-Cookie")); return session.receive() .map(WebSocketMessage::getPayloadAsText) - .subscribeWith(output) + .doOnNext(receivedCookieRef::set) + .doOnError(receivedCookieRef::set) .then(); }) .block(TIMEOUT); - assertThat(output.block(TIMEOUT)).isEqualTo("cookie"); + assertThat(receivedCookieRef.get()).isEqualTo("cookie"); assertThat(cookie.get()).isEqualTo("project=spring"); } diff --git a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java index ae3d4b2bdeb..d7952e28a09 100644 --- a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java +++ b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java @@ -32,7 +32,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.Sinks; import org.springframework.core.MethodParameter; @@ -109,12 +108,18 @@ public class ReactiveTypeHandlerTests { public void deferredResultSubscriberWithOneValue() throws Exception { // Mono - MonoProcessor mono = MonoProcessor.fromSink(Sinks.one()); - testDeferredResultSubscriber(mono, Mono.class, forClass(String.class), () -> mono.onNext("foo"), "foo"); + Sinks.One sink = Sinks.one(); + testDeferredResultSubscriber( + sink.asMono(), Mono.class, forClass(String.class), + () -> sink.emitValue("foo", Sinks.EmitFailureHandler.FAIL_FAST), + "foo"); // Mono empty - MonoProcessor monoEmpty = MonoProcessor.fromSink(Sinks.one()); - testDeferredResultSubscriber(monoEmpty, Mono.class, forClass(String.class), monoEmpty::onComplete, null); + Sinks.One emptySink = Sinks.one(); + testDeferredResultSubscriber( + emptySink.asMono(), Mono.class, forClass(String.class), + () -> emptySink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST), + null); // RxJava Single AtomicReference> ref2 = new AtomicReference<>(); @@ -125,8 +130,10 @@ public class ReactiveTypeHandlerTests { @Test public void deferredResultSubscriberWithNoValues() throws Exception { - MonoProcessor monoEmpty = MonoProcessor.fromSink(Sinks.one()); - testDeferredResultSubscriber(monoEmpty, Mono.class, forClass(String.class), monoEmpty::onComplete, null); + Sinks.One sink = Sinks.one(); + testDeferredResultSubscriber(sink.asMono(), Mono.class, forClass(String.class), + () -> sink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST), + null); } @Test @@ -152,8 +159,9 @@ public class ReactiveTypeHandlerTests { IllegalStateException ex = new IllegalStateException(); // Mono - MonoProcessor mono = MonoProcessor.fromSink(Sinks.one()); - testDeferredResultSubscriber(mono, Mono.class, forClass(String.class), () -> mono.onError(ex), ex); + Sinks.One sink = Sinks.one(); + testDeferredResultSubscriber(sink.asMono(), Mono.class, forClass(String.class), + () -> sink.emitError(ex, Sinks.EmitFailureHandler.FAIL_FAST), ex); // RxJava Single AtomicReference> ref2 = new AtomicReference<>();