Remove use of MonoProcessor.fromSinks

See gh-25884
This commit is contained in:
Rossen Stoyanchev 2020-10-09 20:45:27 +01:00
parent cdd48ddd7f
commit e73e489fd8
30 changed files with 300 additions and 227 deletions

View File

@ -18,12 +18,12 @@ package org.springframework.core.codec;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
@ -93,16 +93,21 @@ public interface Decoder<T> {
default T decode(DataBuffer buffer, ResolvableType targetType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) throws DecodingException {
MonoProcessor<T> processor = MonoProcessor.fromSink(Sinks.one());
decodeToMono(Mono.just(buffer), targetType, mimeType, hints).subscribeWith(processor);
CompletableFuture<T> future = decodeToMono(Mono.just(buffer), targetType, mimeType, hints).toFuture();
Assert.state(future.isDone(), "DataBuffer decoding should have completed.");
Assert.state(processor.isTerminated(), "DataBuffer decoding should have completed.");
Throwable ex = processor.getError();
if (ex != null) {
throw (ex instanceof CodecException ? (CodecException) ex :
new DecodingException("Failed to decode: " + ex.getMessage(), ex));
Throwable failure;
try {
return future.get();
}
return processor.peek();
catch (ExecutionException ex) {
failure = ex.getCause();
}
catch (InterruptedException ex) {
failure = ex;
}
throw (failure instanceof CodecException ? (CodecException) failure :
new DecodingException("Failed to decode: " + failure.getMessage(), failure));
}
/**

View File

@ -27,8 +27,6 @@ import io.rsocket.frame.FrameType;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
@ -162,8 +160,9 @@ class MessagingRSocket implements RSocket {
((NettyDataBuffer) dataBuffer).getNativeBuffer().refCnt() : 1;
}
@SuppressWarnings("deprecation")
private Flux<Payload> handleAndReply(Payload firstPayload, FrameType frameType, Flux<Payload> payloads) {
MonoProcessor<Flux<Payload>> replyMono = MonoProcessor.fromSink(Sinks.one());
reactor.core.publisher.MonoProcessor<Flux<Payload>> replyMono = reactor.core.publisher.MonoProcessor.create();
MessageHeaders headers = createHeaders(firstPayload, frameType, replyMono);
AtomicBoolean read = new AtomicBoolean();
@ -186,8 +185,9 @@ class MessagingRSocket implements RSocket {
return PayloadUtils.retainDataAndReleasePayload(payload, this.strategies.dataBufferFactory());
}
@SuppressWarnings("deprecation")
private MessageHeaders createHeaders(Payload payload, FrameType frameType,
@Nullable MonoProcessor<?> replyMono) {
@Nullable reactor.core.publisher.MonoProcessor<?> replyMono) {
MessageHeaderAccessor headers = new MessageHeaderAccessor();
headers.setLeaveMutable(true);

View File

@ -21,7 +21,6 @@ import java.util.List;
import io.rsocket.Payload;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapterRegistry;
@ -36,7 +35,7 @@ import org.springframework.util.Assert;
/**
* Extension of {@link AbstractEncoderMethodReturnValueHandler} that
* {@link #handleEncodedContent handles} encoded content by wrapping data buffers
* as RSocket payloads and by passing those to the {@link MonoProcessor}
* as RSocket payloads and by passing those to the {@link reactor.core.publisher.MonoProcessor}
* from the {@link #RESPONSE_HEADER} header.
*
* @author Rossen Stoyanchev
@ -45,7 +44,7 @@ import org.springframework.util.Assert;
public class RSocketPayloadReturnValueHandler extends AbstractEncoderMethodReturnValueHandler {
/**
* Message header name that is expected to have a {@link MonoProcessor}
* Message header name that is expected to have a {@link reactor.core.publisher.MonoProcessor}
* which will receive the {@code Flux<Payload>} that represents the response.
*/
public static final String RESPONSE_HEADER = "rsocketResponse";
@ -57,11 +56,11 @@ public class RSocketPayloadReturnValueHandler extends AbstractEncoderMethodRetur
@Override
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "deprecation"})
protected Mono<Void> handleEncodedContent(
Flux<DataBuffer> encodedContent, MethodParameter returnType, Message<?> message) {
MonoProcessor<Flux<Payload>> replyMono = getReplyMono(message);
reactor.core.publisher.MonoProcessor<Flux<Payload>> replyMono = getReplyMono(message);
Assert.notNull(replyMono, "Missing '" + RESPONSE_HEADER + "'");
replyMono.onNext(encodedContent.map(PayloadUtils::createPayload));
replyMono.onComplete();
@ -69,8 +68,9 @@ public class RSocketPayloadReturnValueHandler extends AbstractEncoderMethodRetur
}
@Override
@SuppressWarnings("deprecation")
protected Mono<Void> handleNoContent(MethodParameter returnType, Message<?> message) {
MonoProcessor<Flux<Payload>> replyMono = getReplyMono(message);
reactor.core.publisher.MonoProcessor<Flux<Payload>> replyMono = getReplyMono(message);
if (replyMono != null) {
replyMono.onComplete();
}
@ -78,11 +78,11 @@ public class RSocketPayloadReturnValueHandler extends AbstractEncoderMethodRetur
}
@Nullable
@SuppressWarnings("unchecked")
private MonoProcessor<Flux<Payload>> getReplyMono(Message<?> message) {
@SuppressWarnings({"unchecked", "deprecation"})
private reactor.core.publisher.MonoProcessor<Flux<Payload>> getReplyMono(Message<?> message) {
Object headerValue = message.getHeaders().get(RESPONSE_HEADER);
Assert.state(headerValue == null || headerValue instanceof MonoProcessor, "Expected MonoProcessor");
return (MonoProcessor<Flux<Payload>>) headerValue;
Assert.state(headerValue == null || headerValue instanceof reactor.core.publisher.MonoProcessor, "Expected MonoProcessor");
return (reactor.core.publisher.MonoProcessor<Flux<Payload>>) headerValue;
}
}

View File

@ -19,8 +19,8 @@ package org.springframework.messaging.tcp.reactor;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import io.netty.buffer.ByteBuf;
@ -33,7 +33,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@ -53,6 +52,7 @@ import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.CompletableToListenableFutureAdapter;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.MonoToListenableFutureAdapter;
import org.springframework.util.concurrent.SettableListenableFuture;
@ -205,13 +205,13 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
}
// Report first connect to the ListenableFuture
MonoProcessor<Void> connectMono = MonoProcessor.fromSink(Sinks.one());
CompletableFuture<Void> connectFuture = new CompletableFuture<>();
this.tcpClient
.handle(new ReactorNettyHandler(handler))
.connect()
.doOnNext(updateConnectMono(connectMono))
.doOnError(updateConnectMono(connectMono))
.doOnNext(conn -> connectFuture.complete(null))
.doOnError(connectFuture::completeExceptionally)
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler
.flatMap(Connection::onDispose) // post-connect issues
.retryWhen(Retry.from(signals -> signals
@ -222,7 +222,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
.flatMap(attempt -> reconnect(attempt, strategy)))
.subscribe();
return new MonoToListenableFutureAdapter<>(connectMono);
return new CompletableToListenableFutureAdapter<>(connectFuture);
}
private ListenableFuture<Void> handleShuttingDownConnectFailure(TcpConnectionHandler<P> handler) {
@ -231,19 +231,6 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
return new MonoToListenableFutureAdapter<>(Mono.error(ex));
}
private <T> Consumer<T> updateConnectMono(MonoProcessor<Void> connectMono) {
return o -> {
if (!connectMono.isTerminated()) {
if (o instanceof Throwable) {
connectMono.onError((Throwable) o);
}
else {
connectMono.onComplete();
}
}
};
}
private Publisher<? extends Long> reconnect(Integer attempt, ReconnectStrategy reconnectStrategy) {
Long time = reconnectStrategy.getTimeToNextAttempt(attempt);
return (time != null ? Mono.delay(Duration.ofMillis(time), this.scheduler) : Mono.empty());
@ -316,8 +303,8 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
logger.debug("Connected to " + conn.address());
}
});
MonoProcessor<Void> completion = MonoProcessor.fromSink(Sinks.one());
TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion);
Sinks.Empty<Void> completionSink = Sinks.empty();
TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completionSink);
scheduler.schedule(() -> this.connectionHandler.afterConnected(connection));
inbound.withConnection(conn -> conn.addHandler(new StompMessageDecoder<>(codec)));
@ -330,7 +317,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
this.connectionHandler::handleFailure,
this.connectionHandler::afterConnectionClosed);
return completion;
return completionSink.asMono();
}
}

View File

@ -18,7 +18,7 @@ package org.springframework.messaging.tcp.reactor;
import io.netty.buffer.ByteBuf;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import reactor.netty.NettyInbound;
import reactor.netty.NettyOutbound;
@ -42,16 +42,16 @@ public class ReactorNettyTcpConnection<P> implements TcpConnection<P> {
private final ReactorNettyCodec<P> codec;
private final MonoProcessor<Void> closeProcessor;
private final Sinks.Empty<Void> completionSink;
public ReactorNettyTcpConnection(NettyInbound inbound, NettyOutbound outbound,
ReactorNettyCodec<P> codec, MonoProcessor<Void> closeProcessor) {
ReactorNettyCodec<P> codec, Sinks.Empty<Void> completionSink) {
this.inbound = inbound;
this.outbound = outbound;
this.codec = codec;
this.closeProcessor = closeProcessor;
this.completionSink = completionSink;
}
@ -75,7 +75,8 @@ public class ReactorNettyTcpConnection<P> implements TcpConnection<P> {
@Override
public void close() {
this.closeProcessor.onComplete();
// Ignore result: can't overflow, ok if not first or no one listens
this.completionSink.tryEmitEmpty();
}
}

View File

@ -28,7 +28,6 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
@ -126,15 +125,15 @@ class RSocketServerToClientIntegrationTests {
static class ServerController {
// Must be initialized by @Test method...
volatile MonoProcessor<Void> result;
volatile Sinks.Empty<Void> resultSink;
void reset() {
this.result = MonoProcessor.fromSink(Sinks.one());
this.resultSink = Sinks.empty();
}
void await(Duration duration) {
this.result.block(duration);
this.resultSink.asMono().block(duration);
}
@ -201,8 +200,8 @@ class RSocketServerToClientIntegrationTests {
private void runTest(Runnable testEcho) {
Mono.fromRunnable(testEcho)
.doOnError(ex -> result.onError(ex))
.doOnSuccess(o -> result.onComplete())
.doOnError(ex -> resultSink.emitError(ex, Sinks.EmitFailureHandler.FAIL_FAST))
.doOnSuccess(o -> resultSink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST))
.subscribeOn(Schedulers.boundedElastic()) // StepVerifier will block
.subscribe();
}

View File

@ -34,7 +34,6 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.context.support.StaticApplicationContext;
@ -342,8 +341,8 @@ public class SimpAnnotationMethodMessageHandlerTests {
Message<?> message = createMessage("/app1/mono");
this.messageHandler.handleMessage(message);
assertThat(controller.monoProcessor).isNotNull();
controller.monoProcessor.onNext("foo");
assertThat(controller.oneSink).isNotNull();
controller.oneSink.emitValue("foo", Sinks.EmitFailureHandler.FAIL_FAST);
verify(this.converter).toMessage(this.payloadCaptor.capture(), any(MessageHeaders.class));
assertThat(this.payloadCaptor.getValue()).isEqualTo("foo");
}
@ -357,7 +356,7 @@ public class SimpAnnotationMethodMessageHandlerTests {
Message<?> message = createMessage("/app1/mono");
this.messageHandler.handleMessage(message);
controller.monoProcessor.onError(new IllegalStateException());
controller.oneSink.emitError(new IllegalStateException(), Sinks.EmitFailureHandler.FAIL_FAST);
assertThat(controller.exceptionCaught).isTrue();
}
@ -370,8 +369,8 @@ public class SimpAnnotationMethodMessageHandlerTests {
Message<?> message = createMessage("/app1/flux");
this.messageHandler.handleMessage(message);
assertThat(controller.fluxSink).isNotNull();
controller.fluxSink.tryEmitNext("foo");
assertThat(controller.manySink).isNotNull();
controller.manySink.tryEmitNext("foo");
verify(this.converter, never()).toMessage(any(), any(MessageHeaders.class));
}
@ -585,22 +584,22 @@ public class SimpAnnotationMethodMessageHandlerTests {
@Controller
private static class ReactiveController {
private MonoProcessor<String> monoProcessor;
private Sinks.One<String> oneSink;
private Sinks.Many<String> fluxSink;
private Sinks.Many<String> manySink;
private boolean exceptionCaught = false;
@MessageMapping("mono")
public Mono<String> handleMono() {
this.monoProcessor = MonoProcessor.fromSink(Sinks.one());
return this.monoProcessor;
this.oneSink = Sinks.one();
return this.oneSink.asMono();
}
@MessageMapping("flux")
public Flux<String> handleFlux() {
this.fluxSink = Sinks.many().unicast().onBackpressureBuffer();
return this.fluxSink.asFlux();
this.manySink = Sinks.many().unicast().onBackpressureBuffer();
return this.manySink.asFlux();
}
@MessageExceptionHandler(IllegalStateException.class)

View File

@ -25,7 +25,6 @@ import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.core.io.buffer.DataBuffer;
@ -64,11 +63,14 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse {
public MockServerHttpResponse(DataBufferFactory dataBufferFactory) {
super(dataBufferFactory);
this.writeHandler = body -> {
// Avoid .then() which causes data buffers to be released
MonoProcessor<Void> completion = MonoProcessor.fromSink(Sinks.one());
this.body = body.doOnComplete(completion::onComplete).doOnError(completion::onError).cache();
// Avoid .then() that causes data buffers to be discarded and released
Sinks.Empty<Void> completion = Sinks.unsafe().empty();
this.body = body
.doOnComplete(completion::tryEmitEmpty) // Ignore error: cached + serialized
.doOnError(completion::tryEmitError)
.cache();
this.body.subscribe();
return completion;
return completion.asMono();
};
}

View File

@ -24,7 +24,6 @@ import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
@ -84,8 +83,8 @@ public class HttpHandlerConnector implements ClientHttpConnector {
private Mono<ClientHttpResponse> doConnect(
HttpMethod httpMethod, URI uri, Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
MonoProcessor<Void> requestWriteCompletion = MonoProcessor.fromSink(Sinks.one());
MonoProcessor<Void> handlerCompletion = MonoProcessor.fromSink(Sinks.one());
Sinks.Empty<Void> requestWriteCompletion = Sinks.empty();
Sinks.Empty<Void> handlerCompletion = Sinks.empty();
ClientHttpResponse[] savedResponse = new ClientHttpResponse[1];
MockClientHttpRequest mockClientRequest = new MockClientHttpRequest(httpMethod, uri);
@ -95,7 +94,10 @@ public class HttpHandlerConnector implements ClientHttpConnector {
log("Invoking HttpHandler for ", httpMethod, uri);
ServerHttpRequest mockServerRequest = adaptRequest(mockClientRequest, requestBody);
ServerHttpResponse responseToUse = prepareResponse(mockServerResponse, mockServerRequest);
this.handler.handle(mockServerRequest, responseToUse).subscribe(handlerCompletion);
this.handler.handle(mockServerRequest, responseToUse).subscribe(
aVoid -> {},
handlerCompletion::tryEmitError, // Ignore error: cached + serialized
handlerCompletion::tryEmitEmpty);
return Mono.empty();
});
@ -106,9 +108,12 @@ public class HttpHandlerConnector implements ClientHttpConnector {
}));
log("Writing client request for ", httpMethod, uri);
requestCallback.apply(mockClientRequest).subscribe(requestWriteCompletion);
requestCallback.apply(mockClientRequest).subscribe(
aVoid -> {},
requestWriteCompletion::tryEmitError, // Ignore error: cached + serialized
requestWriteCompletion::tryEmitEmpty);
return Mono.when(requestWriteCompletion, handlerCompletion)
return Mono.when(requestWriteCompletion.asMono(), handlerCompletion.asMono())
.onErrorMap(ex -> {
ClientHttpResponse response = savedResponse[0];
return response != null ? new FailureAfterResponseCompletedException(response, ex) : ex;

View File

@ -24,9 +24,9 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.core.io.buffer.DataBuffer;
@ -138,7 +138,8 @@ class WiretapConnector implements ClientHttpConnector {
private final DataBuffer buffer = DefaultDataBufferFactory.sharedInstance.allocateBuffer();
private final MonoProcessor<byte[]> content = MonoProcessor.fromSink(Sinks.one());
// unsafe(): we're intercepting, already serialized Publisher signals
private final Sinks.One<byte[]> content = Sinks.unsafe().one();
private boolean hasContentConsumer;
@ -167,7 +168,8 @@ class WiretapConnector implements ClientHttpConnector {
.doOnComplete(this::handleOnComplete) : null;
if (publisher == null && publisherNested == null) {
this.content.onComplete();
// Ignore result: OK or not relevant
this.content.tryEmitEmpty();
}
}
@ -184,8 +186,8 @@ class WiretapConnector implements ClientHttpConnector {
public Mono<byte[]> getContent() {
return Mono.defer(() -> {
if (this.content.isTerminated()) {
return this.content;
if (this.content.scan(Scannable.Attr.TERMINATED) == Boolean.TRUE) {
return this.content.asMono();
}
if (!this.hasContentConsumer) {
// Couple of possible cases:
@ -198,23 +200,21 @@ class WiretapConnector implements ClientHttpConnector {
"an error was raised while attempting to produce it.", ex))
.subscribe();
}
return this.content;
return this.content.asMono();
});
}
private void handleOnError(Throwable ex) {
if (!this.content.isTerminated()) {
this.content.onError(ex);
}
// Ignore result: OK or not relevant
this.content.tryEmitError(ex);
}
private void handleOnComplete() {
if (!this.content.isTerminated()) {
byte[] bytes = new byte[this.buffer.readableByteCount()];
this.buffer.read(bytes);
this.content.onNext(bytes);
}
byte[] bytes = new byte[this.buffer.readableByteCount()];
this.buffer.read(bytes);
// Ignore result: OK or not relevant
this.content.tryEmitValue(bytes);
}
}

View File

@ -19,8 +19,7 @@ import java.net.URI;
import java.time.Duration;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Mono;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
@ -128,10 +127,9 @@ public class CookieAssertionTests {
MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
response.getCookies().add(cookie.getName(), cookie);
MonoProcessor<byte[]> emptyContent = MonoProcessor.fromSink(Sinks.one());
emptyContent.onComplete();
ExchangeResult result = new ExchangeResult(
request, response, Mono.empty(), Mono.empty(), Duration.ZERO, null, null);
ExchangeResult result = new ExchangeResult(request, response, emptyContent, emptyContent, Duration.ZERO, null, null);
return new CookieAssertions(result, mock(WebTestClient.ResponseSpec.class));
}

View File

@ -23,8 +23,7 @@ import java.time.ZonedDateTime;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Mono;
import org.springframework.http.CacheControl;
import org.springframework.http.HttpHeaders;
@ -241,10 +240,9 @@ class HeaderAssertionTests {
MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
response.getHeaders().putAll(responseHeaders);
MonoProcessor<byte[]> emptyContent = MonoProcessor.fromSink(Sinks.one());
emptyContent.onComplete();
ExchangeResult result = new ExchangeResult(
request, response, Mono.empty(), Mono.empty(), Duration.ZERO, null, null);
ExchangeResult result = new ExchangeResult(request, response, emptyContent, emptyContent, Duration.ZERO, null, null);
return new HeaderAssertions(result, mock(WebTestClient.ResponseSpec.class));
}

View File

@ -20,8 +20,7 @@ import java.net.URI;
import java.time.Duration;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Mono;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
@ -157,10 +156,9 @@ class StatusAssertionTests {
MockClientHttpRequest request = new MockClientHttpRequest(HttpMethod.GET, URI.create("/"));
MockClientHttpResponse response = new MockClientHttpResponse(status);
MonoProcessor<byte[]> emptyContent = MonoProcessor.fromSink(Sinks.one());
emptyContent.onComplete();
ExchangeResult result = new ExchangeResult(
request, response, Mono.empty(), Mono.empty(), Duration.ZERO, null, null);
ExchangeResult result = new ExchangeResult(request, response, emptyContent, emptyContent, Duration.ZERO, null, null);
return new StatusAssertions(result, mock(WebTestClient.ResponseSpec.class));
}

View File

@ -25,7 +25,6 @@ import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.core.io.buffer.DataBuffer;
@ -64,11 +63,14 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse {
public MockServerHttpResponse(DataBufferFactory dataBufferFactory) {
super(dataBufferFactory);
this.writeHandler = body -> {
// Avoid .then() which causes data buffers to be released
MonoProcessor<Void> completion = MonoProcessor.fromSink(Sinks.one());
this.body = body.doOnComplete(completion::onComplete).doOnError(completion::onError).cache();
// Avoid .then() that causes data buffers to be discarded and released
Sinks.Empty<Void> completion = Sinks.unsafe().empty();
this.body = body
.doOnComplete(completion::tryEmitEmpty) // Ignore error: cached + serialized
.doOnError(completion::tryEmitError)
.cache();
this.body.subscribe();
return completion;
return completion.asMono();
};
}

View File

@ -18,11 +18,10 @@ package org.springframework.web.reactive.result.method;
import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.core.DefaultParameterNameDiscoverer;
import org.springframework.core.ParameterNameDiscoverer;
import org.springframework.lang.Nullable;
@ -102,22 +101,26 @@ public class SyncInvocableHandlerMethod extends HandlerMethod {
public HandlerResult invokeForHandlerResult(ServerWebExchange exchange,
BindingContext bindingContext, Object... providedArgs) {
MonoProcessor<HandlerResult> processor = MonoProcessor.fromSink(Sinks.unsafe().one());
this.delegate.invoke(exchange, bindingContext, providedArgs).subscribeWith(processor);
CompletableFuture<HandlerResult> future =
this.delegate.invoke(exchange, bindingContext, providedArgs).toFuture();
if (processor.isTerminated()) {
Throwable ex = processor.getError();
if (ex != null) {
throw (ex instanceof ServerErrorException ? (ServerErrorException) ex :
new ServerErrorException("Failed to invoke: " + getShortLogMessage(), getMethod(), ex));
}
return processor.peek();
}
else {
// Should never happen...
if (!future.isDone()) {
throw new IllegalStateException(
"SyncInvocableHandlerMethod should have completed synchronously.");
}
Throwable failure;
try {
return future.get();
}
catch (ExecutionException ex) {
failure = ex.getCause();
}
catch (InterruptedException ex) {
failure = ex;
}
throw (new ServerErrorException(
"Failed to invoke: " + getShortLogMessage(), getMethod(), failure));
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -56,6 +56,11 @@ public class ErrorsMethodArgumentResolver extends HandlerMethodArgumentResolverS
MethodParameter parameter, BindingContext context, ServerWebExchange exchange) {
Object errors = getErrors(parameter, context);
// Initially Errors/BindingResult is a Mono in the model even if it cannot be declared
// as an async argument. That way it can be resolved first while the Mono can complete
// later at which point the model is also updated for further use.
if (Mono.class.isAssignableFrom(errors.getClass())) {
return ((Mono<?>) errors).cast(Object.class);
}

View File

@ -23,7 +23,6 @@ import java.util.Map;
import java.util.Optional;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.beans.BeanUtils;
@ -111,20 +110,23 @@ public class ModelAttributeMethodArgumentResolver extends HandlerMethodArgumentR
String name = ModelInitializer.getNameForParameter(parameter);
Mono<?> valueMono = prepareAttributeMono(name, valueType, context, exchange);
// unsafe(): we're intercepting, already serialized Publisher signals
Sinks.One<BindingResult> bindingResultSink = Sinks.unsafe().one();
Map<String, Object> model = context.getModel().asMap();
MonoProcessor<BindingResult> bindingResultMono = MonoProcessor.fromSink(Sinks.one());
model.put(BindingResult.MODEL_KEY_PREFIX + name, bindingResultMono);
model.put(BindingResult.MODEL_KEY_PREFIX + name, bindingResultSink.asMono());
return valueMono.flatMap(value -> {
WebExchangeDataBinder binder = context.createDataBinder(exchange, value, name);
return bindRequestParameters(binder, exchange)
.doOnError(bindingResultMono::onError)
.doOnError(ex -> bindingResultSink.emitError(ex, Sinks.EmitFailureHandler.FAIL_FAST))
.doOnSuccess(aVoid -> {
validateIfApplicable(binder, parameter);
BindingResult errors = binder.getBindingResult();
model.put(BindingResult.MODEL_KEY_PREFIX + name, errors);
BindingResult bindingResult = binder.getBindingResult();
model.put(BindingResult.MODEL_KEY_PREFIX + name, bindingResult);
model.put(name, value);
bindingResultMono.onNext(errors);
// serialized and buffered (should never fail)
bindingResultSink.tryEmitValue(bindingResult);
})
.then(Mono.fromCallable(() -> {
BindingResult errors = binder.getBindingResult();

View File

@ -25,7 +25,6 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import reactor.util.concurrent.Queues;
@ -65,7 +64,11 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
@Nullable
private final MonoProcessor<Void> handlerCompletion;
private final Sinks.Empty<Void> handlerCompletionSink;
@Nullable
@SuppressWarnings("deprecation")
private final reactor.core.publisher.MonoProcessor<Void> handlerCompletionMono;
private final WebSocketReceivePublisher receivePublisher;
@ -74,33 +77,53 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
private final AtomicBoolean sendCalled = new AtomicBoolean();
private final MonoProcessor<CloseStatus> closeStatusProcessor = MonoProcessor.fromSink(Sinks.one());
private final Sinks.One<CloseStatus> closeStatusSink = Sinks.one();
/**
* Base constructor.
* @param delegate the native WebSocket session, channel, or connection
* @param id the session id
* @param handshakeInfo the handshake info
* @param info the handshake info
* @param bufferFactory the DataBuffer factor for the current connection
*/
public AbstractListenerWebSocketSession(
T delegate, String id, HandshakeInfo handshakeInfo, DataBufferFactory bufferFactory) {
T delegate, String id, HandshakeInfo info, DataBufferFactory bufferFactory) {
this(delegate, id, handshakeInfo, bufferFactory, null);
this(delegate, id, info, bufferFactory, (Sinks.Empty<Void>) null);
}
/**
* Alternative constructor with completion {@code Mono<Void>} to propagate
* session completion (success or error). This is primarily for use with the
* {@code WebSocketClient} to be able to report the end of execution.
* Alternative constructor with completion sink to use to signal when the
* handling of the session is complete, with success or error.
* <p>Primarily for use with {@code WebSocketClient} to be able to
* communicate the end of handling.
*/
public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo info,
DataBufferFactory bufferFactory, @Nullable MonoProcessor<Void> handlerCompletion) {
DataBufferFactory bufferFactory, @Nullable Sinks.Empty<Void> handlerCompletionSink) {
super(delegate, id, info, bufferFactory);
this.receivePublisher = new WebSocketReceivePublisher();
this.handlerCompletion = handlerCompletion;
this.handlerCompletionSink = handlerCompletionSink;
this.handlerCompletionMono = null;
}
/**
* Alternative constructor with completion MonoProcessor to use to signal
* when the handling of the session is complete, with success or error.
* <p>Primarily for use with {@code WebSocketClient} to be able to
* communicate the end of handling.
* @deprecated as of 5.3 in favor of
* {@link #AbstractListenerWebSocketSession(Object, String, HandshakeInfo, DataBufferFactory, Sinks.Empty)}
*/
@Deprecated
public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo info,
DataBufferFactory bufferFactory, @Nullable reactor.core.publisher.MonoProcessor<Void> handlerCompletion) {
super(delegate, id, info, bufferFactory);
this.receivePublisher = new WebSocketReceivePublisher();
this.handlerCompletionMono = handlerCompletion;
this.handlerCompletionSink = null;
}
@ -133,7 +156,7 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
@Override
public Mono<CloseStatus> closeStatus() {
return this.closeStatusProcessor;
return this.closeStatusSink.asMono();
}
/**
@ -178,9 +201,10 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
this.receivePublisher.handleMessage(message);
}
/** Handle an error callback from the WebSocketHandler adapter. */
/** Handle an error callback from the WebSocket engine. */
void handleError(Throwable ex) {
this.closeStatusProcessor.onComplete();
// Ignore result: can't overflow, ok if not first or no one listens
this.closeStatusSink.tryEmitEmpty();
this.receivePublisher.onError(ex);
WebSocketSendProcessor sendProcessor = this.sendProcessor;
if (sendProcessor != null) {
@ -189,9 +213,10 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
}
}
/** Handle a close callback from the WebSocketHandler adapter. */
/** Handle a close callback from the WebSocket engine. */
void handleClose(CloseStatus closeStatus) {
this.closeStatusProcessor.onNext(closeStatus);
// Ignore result: can't overflow, ok if not first or no one listens
this.closeStatusSink.tryEmitValue(closeStatus);
this.receivePublisher.onAllDataRead();
WebSocketSendProcessor sendProcessor = this.sendProcessor;
if (sendProcessor != null) {
@ -215,16 +240,24 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
@Override
public void onError(Throwable ex) {
if (this.handlerCompletion != null) {
this.handlerCompletion.onError(ex);
if (this.handlerCompletionSink != null) {
// Ignore result: can't overflow, ok if not first or no one listens
this.handlerCompletionSink.tryEmitError(ex);
}
if (this.handlerCompletionMono != null) {
this.handlerCompletionMono.onError(ex);
}
close(CloseStatus.SERVER_ERROR.withReason(ex.getMessage()));
}
@Override
public void onComplete() {
if (this.handlerCompletion != null) {
this.handlerCompletion.onComplete();
if (this.handlerCompletionSink != null) {
// Ignore result: can't overflow, ok if not first or no one listens
this.handlerCompletionSink.tryEmitEmpty();
}
if (this.handlerCompletionMono != null) {
this.handlerCompletionMono.onComplete();
}
close();
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -24,7 +24,7 @@ import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WriteCallback;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.lang.Nullable;
@ -50,17 +50,24 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Sess
public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory) {
this(session, info, factory, null);
this(session, info, factory, (Sinks.Empty<Void>) null);
}
public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory,
@Nullable MonoProcessor<Void> completionMono) {
@Nullable Sinks.Empty<Void> completionSink) {
super(session, ObjectUtils.getIdentityHexString(session), info, factory, completionMono);
super(session, ObjectUtils.getIdentityHexString(session), info, factory, completionSink);
// TODO: suspend causes failures if invoked at this stage
// suspendReceiving();
}
@Deprecated
public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory,
@Nullable reactor.core.publisher.MonoProcessor<Void> completionMono) {
super(session, ObjectUtils.getIdentityHexString(session), info, factory, completionMono);
}
@Override
protected boolean canSuspendReceiving() {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -27,7 +27,7 @@ import javax.websocket.SendResult;
import javax.websocket.Session;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.lang.Nullable;
@ -47,11 +47,18 @@ import org.springframework.web.reactive.socket.WebSocketSession;
public class StandardWebSocketSession extends AbstractListenerWebSocketSession<Session> {
public StandardWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory) {
this(session, info, factory, null);
this(session, info, factory, (Sinks.Empty<Void>) null);
}
public StandardWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory,
@Nullable MonoProcessor<Void> completionMono) {
@Nullable Sinks.Empty<Void> completionSink) {
super(session, session.getId(), info, factory, completionSink);
}
@Deprecated
public StandardWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory,
@Nullable reactor.core.publisher.MonoProcessor<Void> completionMono) {
super(session, session.getId(), info, factory, completionMono);
}

View File

@ -21,7 +21,6 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.websocket.Session;
import org.apache.tomcat.websocket.WsSession;
import reactor.core.publisher.MonoProcessor;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.web.reactive.socket.HandshakeInfo;
@ -47,8 +46,9 @@ public class TomcatWebSocketSession extends StandardWebSocketSession {
super(session, info, factory);
}
@SuppressWarnings("deprecation")
public TomcatWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory,
MonoProcessor<Void> completionMono) {
reactor.core.publisher.MonoProcessor<Void> completionMono) {
super(session, info, factory, completionMono);
suspendReceiving();

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -25,7 +25,7 @@ import io.undertow.websockets.core.WebSocketCallback;
import io.undertow.websockets.core.WebSocketChannel;
import io.undertow.websockets.core.WebSockets;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
@ -48,11 +48,19 @@ import org.springframework.web.reactive.socket.WebSocketSession;
public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<WebSocketChannel> {
public UndertowWebSocketSession(WebSocketChannel channel, HandshakeInfo info, DataBufferFactory factory) {
this(channel, info, factory, null);
this(channel, info, factory, (Sinks.Empty<Void>) null);
}
public UndertowWebSocketSession(WebSocketChannel channel, HandshakeInfo info,
DataBufferFactory factory, @Nullable MonoProcessor<Void> completionMono) {
DataBufferFactory factory, @Nullable Sinks.Empty<Void> completionSink) {
super(channel, ObjectUtils.getIdentityHexString(channel), info, factory, completionSink);
suspendReceiving();
}
@Deprecated
public UndertowWebSocketSession(WebSocketChannel channel, HandshakeInfo info,
DataBufferFactory factory, @Nullable reactor.core.publisher.MonoProcessor<Void> completionMono) {
super(channel, ObjectUtils.getIdentityHexString(channel), info, factory, completionMono);
suspendReceiving();

View File

@ -26,7 +26,6 @@ import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.io.UpgradeListener;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.context.Lifecycle;
@ -137,26 +136,25 @@ public class JettyWebSocketClient implements WebSocketClient, Lifecycle {
}
private Mono<Void> executeInternal(URI url, HttpHeaders headers, WebSocketHandler handler) {
MonoProcessor<Void> completionMono = MonoProcessor.fromSink(Sinks.one());
Sinks.Empty<Void> completionSink = Sinks.empty();
return Mono.fromCallable(
() -> {
if (logger.isDebugEnabled()) {
logger.debug("Connecting to " + url);
}
Object jettyHandler = createHandler(url, handler, completionMono);
Object jettyHandler = createHandler(url, handler, completionSink);
ClientUpgradeRequest request = new ClientUpgradeRequest();
request.setSubProtocols(handler.getSubProtocols());
UpgradeListener upgradeListener = new DefaultUpgradeListener(headers);
return this.jettyClient.connect(jettyHandler, url, request, upgradeListener);
})
.then(completionMono);
.then(completionSink.asMono());
}
private Object createHandler(URI url, WebSocketHandler handler, MonoProcessor<Void> completion) {
private Object createHandler(URI url, WebSocketHandler handler, Sinks.Empty<Void> completion) {
return new JettyWebSocketHandlerAdapter(handler, session -> {
HandshakeInfo info = createHandshakeInfo(url, session);
return new JettyWebSocketSession(
session, info, DefaultDataBufferFactory.sharedInstance, completion);
return new JettyWebSocketSession(session, info, DefaultDataBufferFactory.sharedInstance, completion);
});
}

View File

@ -31,7 +31,6 @@ import javax.websocket.WebSocketContainer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
@ -96,7 +95,7 @@ public class StandardWebSocketClient implements WebSocketClient {
}
private Mono<Void> executeInternal(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
MonoProcessor<Void> completionMono = MonoProcessor.fromSink(Sinks.one());
Sinks.Empty<Void> completionSink = Sinks.empty();
return Mono.fromCallable(
() -> {
if (logger.isDebugEnabled()) {
@ -104,16 +103,16 @@ public class StandardWebSocketClient implements WebSocketClient {
}
List<String> protocols = handler.getSubProtocols();
DefaultConfigurator configurator = new DefaultConfigurator(requestHeaders);
Endpoint endpoint = createEndpoint(url, handler, completionMono, configurator);
Endpoint endpoint = createEndpoint(url, handler, completionSink, configurator);
ClientEndpointConfig config = createEndpointConfig(configurator, protocols);
return this.webSocketContainer.connectToServer(endpoint, config, url);
})
.subscribeOn(Schedulers.boundedElastic()) // connectToServer is blocking
.then(completionMono);
.then(completionSink.asMono());
}
private StandardWebSocketHandlerAdapter createEndpoint(URI url, WebSocketHandler handler,
MonoProcessor<Void> completion, DefaultConfigurator configurator) {
Sinks.Empty<Void> completion, DefaultConfigurator configurator) {
return new StandardWebSocketHandlerAdapter(handler, session ->
createWebSocketSession(session, createHandshakeInfo(url, configurator), completion));
@ -126,9 +125,18 @@ public class StandardWebSocketClient implements WebSocketClient {
}
protected StandardWebSocketSession createWebSocketSession(Session session, HandshakeInfo info,
MonoProcessor<Void> completion) {
Sinks.Empty<Void> completionSink) {
return new StandardWebSocketSession(session, info, DefaultDataBufferFactory.sharedInstance, completion);
return new StandardWebSocketSession(
session, info, DefaultDataBufferFactory.sharedInstance, completionSink);
}
@Deprecated
protected StandardWebSocketSession createWebSocketSession(Session session, HandshakeInfo info,
reactor.core.publisher.MonoProcessor<Void> completionMono) {
return new StandardWebSocketSession(
session, info, DefaultDataBufferFactory.sharedInstance, completionMono);
}
private ClientEndpointConfig createEndpointConfig(Configurator configurator, List<String> subProtocols) {

View File

@ -20,7 +20,6 @@ import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import org.apache.tomcat.websocket.WsWebSocketContainer;
import reactor.core.publisher.MonoProcessor;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.adapter.StandardWebSocketSession;
@ -45,10 +44,11 @@ public class TomcatWebSocketClient extends StandardWebSocketClient {
@Override
@SuppressWarnings("deprecation")
protected StandardWebSocketSession createWebSocketSession(Session session,
HandshakeInfo info, MonoProcessor<Void> completion) {
HandshakeInfo info, reactor.core.publisher.MonoProcessor<Void> completionMono) {
return new TomcatWebSocketSession(session, info, bufferFactory(), completion);
return new TomcatWebSocketSession(session, info, bufferFactory(), completionMono);
}
}

View File

@ -33,7 +33,6 @@ import org.apache.commons.logging.LogFactory;
import org.xnio.IoFuture;
import org.xnio.XnioWorker;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.core.io.buffer.DataBufferFactory;
@ -155,7 +154,7 @@ public class UndertowWebSocketClient implements WebSocketClient {
}
private Mono<Void> executeInternal(URI url, HttpHeaders headers, WebSocketHandler handler) {
MonoProcessor<Void> completion = MonoProcessor.fromSink(Sinks.one());
Sinks.Empty<Void> completionSink = Sinks.empty();
return Mono.fromCallable(
() -> {
if (logger.isDebugEnabled()) {
@ -169,15 +168,17 @@ public class UndertowWebSocketClient implements WebSocketClient {
new IoFuture.HandlingNotifier<WebSocketChannel, Object>() {
@Override
public void handleDone(WebSocketChannel channel, Object attachment) {
handleChannel(url, handler, completion, negotiation, channel);
handleChannel(url, handler, completionSink, negotiation, channel);
}
@Override
public void handleFailed(IOException ex, Object attachment) {
completion.onError(new IllegalStateException("Failed to connect to " + url, ex));
// Ignore result: can't overflow, ok if not first or no one listens
completionSink.tryEmitError(
new IllegalStateException("Failed to connect to " + url, ex));
}
}, null);
})
.then(completion);
.then(completionSink.asMono());
}
/**
@ -194,12 +195,12 @@ public class UndertowWebSocketClient implements WebSocketClient {
return builder;
}
private void handleChannel(URI url, WebSocketHandler handler, MonoProcessor<Void> completion,
private void handleChannel(URI url, WebSocketHandler handler, Sinks.Empty<Void> completionSink,
DefaultNegotiation negotiation, WebSocketChannel channel) {
HandshakeInfo info = createHandshakeInfo(url, negotiation);
DataBufferFactory bufferFactory = DefaultDataBufferFactory.sharedInstance;
UndertowWebSocketSession session = new UndertowWebSocketSession(channel, info, bufferFactory, completion);
UndertowWebSocketSession session = new UndertowWebSocketSession(channel, info, bufferFactory, completionSink);
UndertowWebSocketHandlerAdapter adapter = new UndertowWebSocketHandlerAdapter(session);
channel.getReceiveSetter().set(adapter);

View File

@ -20,8 +20,6 @@ import java.time.Duration;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapterRegistry;
@ -91,9 +89,7 @@ class ErrorsMethodArgumentResolverTests {
@Test
void resolveWithMono() {
BindingResult bindingResult = createBindingResult(new Foo(), "foo");
MonoProcessor<BindingResult> monoProcessor = MonoProcessor.fromSink(Sinks.one());
monoProcessor.onNext(bindingResult);
this.bindingContext.getModel().asMap().put(BindingResult.MODEL_KEY_PREFIX + "foo", monoProcessor);
this.bindingContext.getModel().asMap().put(BindingResult.MODEL_KEY_PREFIX + "foo", Mono.just(bindingResult));
MethodParameter parameter = this.testMethod.arg(Errors.class);
Object actual = this.resolver.resolveArgument(parameter, this.bindingContext, this.exchange)

View File

@ -26,7 +26,7 @@ import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import reactor.core.publisher.Flux;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.test.StepVerifier;
@ -224,7 +224,9 @@ class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
private static final Flux<Long> INTERVAL = testInterval(Duration.ofMillis(100), 50);
private MonoProcessor<Void> cancellation = MonoProcessor.fromSink(Sinks.one());
private final Sinks.Empty<Void> cancelSink = Sinks.empty();
private Mono<Void> cancellation = cancelSink.asMono();
@GetMapping("/string")
@ -250,7 +252,7 @@ class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
Flux<String> infinite() {
return Flux.just(0, 1).map(l -> "foo " + l)
.mergeWith(Flux.never())
.doOnCancel(() -> cancellation.onComplete());
.doOnCancel(() -> cancelSink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST));
}
}

View File

@ -27,8 +27,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import reactor.util.retry.Retry;
import org.springframework.context.annotation.Bean;
@ -99,7 +97,7 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
String protocol = "echo-v1";
AtomicReference<HandshakeInfo> infoRef = new AtomicReference<>();
MonoProcessor<Object> output = MonoProcessor.fromSink(Sinks.unsafe().one());
AtomicReference<Object> protocolRef = new AtomicReference<>();
this.client.execute(getUrl("/sub-protocol"),
new WebSocketHandler() {
@ -113,7 +111,8 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
infoRef.set(session.getHandshakeInfo());
return session.receive()
.map(WebSocketMessage::getPayloadAsText)
.subscribeWith(output)
.doOnNext(protocolRef::set)
.doOnError(protocolRef::set)
.then();
}
})
@ -123,7 +122,7 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
assertThat(info.getHeaders().getFirst("Upgrade")).isEqualToIgnoringCase("websocket");
assertThat(info.getHeaders().getFirst("Sec-WebSocket-Protocol")).isEqualTo(protocol);
assertThat(info.getSubProtocol()).as("Wrong protocol accepted").isEqualTo(protocol);
assertThat(output.block(TIMEOUT)).as("Wrong protocol detected on the server side").isEqualTo(protocol);
assertThat(protocolRef.get()).as("Wrong protocol detected on the server side").isEqualTo(protocol);
}
@ParameterizedWebSocketTest
@ -132,27 +131,28 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
HttpHeaders headers = new HttpHeaders();
headers.add("my-header", "my-value");
MonoProcessor<Object> output = MonoProcessor.fromSink(Sinks.unsafe().one());
AtomicReference<Object> headerRef = new AtomicReference<>();
this.client.execute(getUrl("/custom-header"), headers,
session -> session.receive()
.map(WebSocketMessage::getPayloadAsText)
.subscribeWith(output)
.doOnNext(headerRef::set)
.doOnError(headerRef::set)
.then())
.block(TIMEOUT);
assertThat(output.block(TIMEOUT)).isEqualTo("my-header:my-value");
assertThat(headerRef.get()).isEqualTo("my-header:my-value");
}
@ParameterizedWebSocketTest
void sessionClosing(WebSocketClient client, HttpServer server, Class<?> serverConfigClass) throws Exception {
startServer(client, server, serverConfigClass);
MonoProcessor<CloseStatus> statusProcessor = MonoProcessor.fromSink(Sinks.unsafe().one());
AtomicReference<Object> statusRef = new AtomicReference<>();
this.client.execute(getUrl("/close"),
session -> {
logger.debug("Starting..");
session.closeStatus().subscribe(statusProcessor);
session.closeStatus().subscribe(statusRef::set, statusRef::set, () -> {});
return session.receive()
.doOnNext(s -> logger.debug("inbound " + s))
.then()
@ -162,25 +162,26 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
})
.block(TIMEOUT);
assertThat(statusProcessor.block()).isEqualTo(CloseStatus.GOING_AWAY);
assertThat(statusRef.get()).isEqualTo(CloseStatus.GOING_AWAY);
}
@ParameterizedWebSocketTest
void cookie(WebSocketClient client, HttpServer server, Class<?> serverConfigClass) throws Exception {
startServer(client, server, serverConfigClass);
MonoProcessor<Object> output = MonoProcessor.fromSink(Sinks.unsafe().one());
AtomicReference<String> cookie = new AtomicReference<>();
AtomicReference<Object> receivedCookieRef = new AtomicReference<>();
this.client.execute(getUrl("/cookie"),
session -> {
cookie.set(session.getHandshakeInfo().getHeaders().getFirst("Set-Cookie"));
return session.receive()
.map(WebSocketMessage::getPayloadAsText)
.subscribeWith(output)
.doOnNext(receivedCookieRef::set)
.doOnError(receivedCookieRef::set)
.then();
})
.block(TIMEOUT);
assertThat(output.block(TIMEOUT)).isEqualTo("cookie");
assertThat(receivedCookieRef.get()).isEqualTo("cookie");
assertThat(cookie.get()).isEqualTo("project=spring");
}

View File

@ -32,7 +32,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.core.MethodParameter;
@ -109,12 +108,18 @@ public class ReactiveTypeHandlerTests {
public void deferredResultSubscriberWithOneValue() throws Exception {
// Mono
MonoProcessor<String> mono = MonoProcessor.fromSink(Sinks.one());
testDeferredResultSubscriber(mono, Mono.class, forClass(String.class), () -> mono.onNext("foo"), "foo");
Sinks.One<String> sink = Sinks.one();
testDeferredResultSubscriber(
sink.asMono(), Mono.class, forClass(String.class),
() -> sink.emitValue("foo", Sinks.EmitFailureHandler.FAIL_FAST),
"foo");
// Mono empty
MonoProcessor<String> monoEmpty = MonoProcessor.fromSink(Sinks.one());
testDeferredResultSubscriber(monoEmpty, Mono.class, forClass(String.class), monoEmpty::onComplete, null);
Sinks.One<String> emptySink = Sinks.one();
testDeferredResultSubscriber(
emptySink.asMono(), Mono.class, forClass(String.class),
() -> emptySink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST),
null);
// RxJava Single
AtomicReference<SingleEmitter<String>> ref2 = new AtomicReference<>();
@ -125,8 +130,10 @@ public class ReactiveTypeHandlerTests {
@Test
public void deferredResultSubscriberWithNoValues() throws Exception {
MonoProcessor<String> monoEmpty = MonoProcessor.fromSink(Sinks.one());
testDeferredResultSubscriber(monoEmpty, Mono.class, forClass(String.class), monoEmpty::onComplete, null);
Sinks.One<String> sink = Sinks.one();
testDeferredResultSubscriber(sink.asMono(), Mono.class, forClass(String.class),
() -> sink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST),
null);
}
@Test
@ -152,8 +159,9 @@ public class ReactiveTypeHandlerTests {
IllegalStateException ex = new IllegalStateException();
// Mono
MonoProcessor<String> mono = MonoProcessor.fromSink(Sinks.one());
testDeferredResultSubscriber(mono, Mono.class, forClass(String.class), () -> mono.onError(ex), ex);
Sinks.One<String> sink = Sinks.one();
testDeferredResultSubscriber(sink.asMono(), Mono.class, forClass(String.class),
() -> sink.emitError(ex, Sinks.EmitFailureHandler.FAIL_FAST), ex);
// RxJava Single
AtomicReference<SingleEmitter<String>> ref2 = new AtomicReference<>();