Update after MonoProcessor deprecation in Reactor

This commit adapts the usage of `MonoProcessor` after deprecations
introduced in reactor/reactor-core#1053
This commit is contained in:
Brian Clozel 2020-08-07 18:59:21 +02:00
parent 922f9452f2
commit 617ec359bd
13 changed files with 29 additions and 15 deletions

View File

@ -23,6 +23,7 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
@ -92,7 +93,7 @@ public interface Decoder<T> {
default T decode(DataBuffer buffer, ResolvableType targetType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) throws DecodingException {
MonoProcessor<T> processor = MonoProcessor.create();
MonoProcessor<T> processor = MonoProcessor.fromSink(Sinks.one());
decodeToMono(Mono.just(buffer), targetType, mimeType, hints).subscribeWith(processor);
Assert.state(processor.isTerminated(), "DataBuffer decoding should have completed.");

View File

@ -33,6 +33,7 @@ import org.springframework.util.Assert;
* @since 5.1
* @param <T> the object type
*/
@SuppressWarnings("deprecation")
public class MonoToListenableFutureAdapter<T> implements ListenableFuture<T> {
private final MonoProcessor<T> processor;

View File

@ -28,6 +28,7 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
@ -162,7 +163,7 @@ class MessagingRSocket implements RSocket {
}
private Flux<Payload> handleAndReply(Payload firstPayload, FrameType frameType, Flux<Payload> payloads) {
MonoProcessor<Flux<Payload>> replyMono = MonoProcessor.create();
MonoProcessor<Flux<Payload>> replyMono = MonoProcessor.fromSink(Sinks.one());
MessageHeaders headers = createHeaders(firstPayload, frameType, replyMono);
AtomicBoolean read = new AtomicBoolean();

View File

@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.netty.Connection;
@ -204,7 +205,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
}
// Report first connect to the ListenableFuture
MonoProcessor<Void> connectMono = MonoProcessor.create();
MonoProcessor<Void> connectMono = MonoProcessor.fromSink(Sinks.one());
this.tcpClient
.handle(new ReactorNettyHandler(handler))
@ -315,7 +316,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
logger.debug("Connected to " + conn.address());
}
});
MonoProcessor<Void> completion = MonoProcessor.create();
MonoProcessor<Void> completion = MonoProcessor.fromSink(Sinks.one());
TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion);
scheduler.schedule(() -> this.connectionHandler.afterConnected(connection));

View File

@ -26,6 +26,7 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
@ -64,7 +65,7 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse {
super(dataBufferFactory);
this.writeHandler = body -> {
// Avoid .then() which causes data buffers to be released
MonoProcessor<Void> completion = MonoProcessor.create();
MonoProcessor<Void> completion = MonoProcessor.fromSink(Sinks.one());
this.body = body.doOnComplete(completion::onComplete).doOnError(completion::onError).cache();
this.body.subscribe();
return completion;

View File

@ -25,6 +25,7 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import org.springframework.core.io.buffer.DataBuffer;
@ -83,8 +84,8 @@ public class HttpHandlerConnector implements ClientHttpConnector {
private Mono<ClientHttpResponse> doConnect(
HttpMethod httpMethod, URI uri, Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
MonoProcessor<Void> requestWriteCompletion = MonoProcessor.create();
MonoProcessor<Void> handlerCompletion = MonoProcessor.create();
MonoProcessor<Void> requestWriteCompletion = MonoProcessor.fromSink(Sinks.one());
MonoProcessor<Void> handlerCompletion = MonoProcessor.fromSink(Sinks.one());
ClientHttpResponse[] savedResponse = new ClientHttpResponse[1];
MockClientHttpRequest mockClientRequest = new MockClientHttpRequest(httpMethod, uri);

View File

@ -27,6 +27,7 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
@ -132,7 +133,7 @@ class WiretapConnector implements ClientHttpConnector {
private final DataBuffer buffer = DefaultDataBufferFactory.sharedInstance.allocateBuffer();
private final MonoProcessor<byte[]> content = MonoProcessor.create();
private final MonoProcessor<byte[]> content = MonoProcessor.fromSink(Sinks.one());
private boolean hasContentConsumer;

View File

@ -22,6 +22,7 @@ import java.util.List;
import java.util.stream.Collectors;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.core.DefaultParameterNameDiscoverer;
import org.springframework.core.ParameterNameDiscoverer;
@ -102,7 +103,7 @@ public class SyncInvocableHandlerMethod extends HandlerMethod {
public HandlerResult invokeForHandlerResult(ServerWebExchange exchange,
BindingContext bindingContext, Object... providedArgs) {
MonoProcessor<HandlerResult> processor = MonoProcessor.create();
MonoProcessor<HandlerResult> processor = MonoProcessor.fromSink(Sinks.one());
this.delegate.invoke(exchange, bindingContext, providedArgs).subscribeWith(processor);
if (processor.isTerminated()) {

View File

@ -25,6 +25,7 @@ import java.util.Optional;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.beans.BeanUtils;
import org.springframework.core.DefaultParameterNameDiscoverer;
@ -116,7 +117,7 @@ public class ModelAttributeMethodArgumentResolver extends HandlerMethodArgumentR
Mono<?> valueMono = prepareAttributeMono(name, valueType, context, exchange);
Map<String, Object> model = context.getModel().asMap();
MonoProcessor<BindingResult> bindingResultMono = MonoProcessor.create();
MonoProcessor<BindingResult> bindingResultMono = MonoProcessor.fromSink(Sinks.one());
model.put(BindingResult.MODEL_KEY_PREFIX + name, bindingResultMono);
return valueMono.flatMap(value -> {

View File

@ -26,6 +26,7 @@ import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import reactor.util.concurrent.Queues;
import org.springframework.core.io.buffer.DataBufferFactory;
@ -73,7 +74,7 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
private final AtomicBoolean sendCalled = new AtomicBoolean();
private final MonoProcessor<CloseStatus> closeStatusProcessor = MonoProcessor.create();
private final MonoProcessor<CloseStatus> closeStatusProcessor = MonoProcessor.fromSink(Sinks.one());
/**

View File

@ -27,6 +27,7 @@ import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.io.UpgradeListener;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.context.Lifecycle;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
@ -136,7 +137,7 @@ public class JettyWebSocketClient implements WebSocketClient, Lifecycle {
}
private Mono<Void> executeInternal(URI url, HttpHeaders headers, WebSocketHandler handler) {
MonoProcessor<Void> completionMono = MonoProcessor.create();
MonoProcessor<Void> completionMono = MonoProcessor.fromSink(Sinks.one());
return Mono.fromCallable(
() -> {
if (logger.isDebugEnabled()) {

View File

@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import org.springframework.core.io.buffer.DataBufferFactory;
@ -95,7 +96,7 @@ public class StandardWebSocketClient implements WebSocketClient {
}
private Mono<Void> executeInternal(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
MonoProcessor<Void> completionMono = MonoProcessor.create();
MonoProcessor<Void> completionMono = MonoProcessor.fromSink(Sinks.one());
return Mono.fromCallable(
() -> {
if (logger.isDebugEnabled()) {

View File

@ -34,6 +34,7 @@ import org.xnio.IoFuture;
import org.xnio.XnioWorker;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
@ -72,7 +73,8 @@ public class UndertowWebSocketClient implements WebSocketClient {
* @param worker the Xnio worker
*/
public UndertowWebSocketClient(XnioWorker worker) {
this(worker, builder -> {});
this(worker, builder -> {
});
}
/**
@ -153,7 +155,7 @@ public class UndertowWebSocketClient implements WebSocketClient {
}
private Mono<Void> executeInternal(URI url, HttpHeaders headers, WebSocketHandler handler) {
MonoProcessor<Void> completion = MonoProcessor.create();
MonoProcessor<Void> completion = MonoProcessor.fromSink(Sinks.one());
return Mono.fromCallable(
() -> {
if (logger.isDebugEnabled()) {