Replace remaining use of deprecated Processors in tests

See gh-25085
This commit is contained in:
Rossen Stoyanchev 2020-06-23 20:29:35 +01:00
parent 0e83aaa756
commit 22bf62def1
5 changed files with 75 additions and 75 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -32,11 +32,10 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.context.support.StaticApplicationContext;
import org.springframework.lang.Nullable;
@ -343,8 +342,8 @@ public class SimpAnnotationMethodMessageHandlerTests {
Message<?> message = createMessage("/app1/mono");
this.messageHandler.handleMessage(message);
assertThat(controller.mono).isNotNull();
controller.mono.onNext("foo");
assertThat(controller.monoProcessor).isNotNull();
controller.monoProcessor.onNext("foo");
verify(this.converter).toMessage(this.payloadCaptor.capture(), any(MessageHeaders.class));
assertThat(this.payloadCaptor.getValue()).isEqualTo("foo");
}
@ -358,7 +357,7 @@ public class SimpAnnotationMethodMessageHandlerTests {
Message<?> message = createMessage("/app1/mono");
this.messageHandler.handleMessage(message);
controller.mono.onError(new IllegalStateException());
controller.monoProcessor.onError(new IllegalStateException());
assertThat(controller.exceptionCaught).isTrue();
}
@ -371,14 +370,14 @@ public class SimpAnnotationMethodMessageHandlerTests {
Message<?> message = createMessage("/app1/flux");
this.messageHandler.handleMessage(message);
assertThat(controller.flux).isNotNull();
controller.flux.onNext("foo");
assertThat(controller.fluxSink).isNotNull();
controller.fluxSink.next("foo");
verify(this.converter, never()).toMessage(any(), any(MessageHeaders.class));
}
@Test
public void placeholder() throws Exception {
public void placeholder() {
Message<?> message = createMessage("/pre/myValue");
this.messageHandler.setEmbeddedValueResolver(value -> ("/${myProperty}".equals(value) ? "/myValue" : value));
this.messageHandler.registerHandler(this.testController);
@ -586,22 +585,22 @@ public class SimpAnnotationMethodMessageHandlerTests {
@Controller
private static class ReactiveController {
private MonoProcessor<String> mono;
private MonoProcessor<String> monoProcessor;
private FluxProcessor<String, String> flux;
private Sinks.StandaloneFluxSink<String> fluxSink;
private boolean exceptionCaught = false;
@MessageMapping("mono")
public Mono<String> handleMono() {
this.mono = MonoProcessor.create();
return this.mono;
this.monoProcessor = MonoProcessor.create();
return this.monoProcessor;
}
@MessageMapping("flux")
public Flux<String> handleFlux() {
this.flux = EmitterProcessor.create();
return this.flux;
this.fluxSink = Sinks.unicast();
return this.fluxSink.asFlux();
}
@MessageExceptionHandler(IllegalStateException.class)

View File

@ -26,6 +26,7 @@ import java.util.Map;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.UnicastProcessor;
import org.springframework.core.ResolvableType;
@ -207,12 +208,12 @@ public class MultipartHttpMessageWriterTests extends AbstractLeakCheckingTests {
@Test // SPR-16402
public void singleSubscriberWithResource() throws IOException {
UnicastProcessor<Resource> processor = UnicastProcessor.create();
Sinks.StandaloneFluxSink<Resource> sink = Sinks.unicast();
Resource logo = new ClassPathResource("/org/springframework/http/converter/logo.jpg");
Mono.just(logo).subscribe(processor);
sink.next(logo);
MultipartBodyBuilder bodyBuilder = new MultipartBodyBuilder();
bodyBuilder.asyncPart("logo", processor, Resource.class);
bodyBuilder.asyncPart("logo", sink.asFlux(), Resource.class);
Mono<MultiValueMap<String, HttpEntity<?>>> result = Mono.just(bodyBuilder.build());

View File

@ -28,7 +28,6 @@ import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.ReplayProcessor;
import reactor.util.retry.Retry;
import org.springframework.context.annotation.Bean;
@ -81,15 +80,16 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
private void testEcho() {
int count = 100;
Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
ReplayProcessor<Object> output = ReplayProcessor.create(count);
this.client.execute(getUrl("/echo"), session -> session
.send(input.map(session::textMessage))
.thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
.subscribeWith(output)
.then())
AtomicReference<List<String>> actualRef = new AtomicReference<>();
this.client.execute(getUrl("/echo"), session ->
session.send(input.map(session::textMessage))
.thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
.collectList()
.doOnNext(actualRef::set)
.then())
.block(TIMEOUT);
assertThat(output.isTerminated()).isTrue();
assertThat(output.collectList().block()).isEqualTo(input.collectList().block());
assertThat(actualRef.get()).isNotNull();
assertThat(actualRef.get()).isEqualTo(input.collectList().block());
}
@ParameterizedWebSocketTest

View File

@ -30,10 +30,10 @@ import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleEmitter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapterRegistry;
@ -138,11 +138,11 @@ public class ReactiveTypeHandlerTests {
Bar bar1 = new Bar("foo");
Bar bar2 = new Bar("bar");
EmitterProcessor<Bar> emitter = EmitterProcessor.create();
testDeferredResultSubscriber(emitter, Flux.class, forClass(Bar.class), () -> {
emitter.onNext(bar1);
emitter.onNext(bar2);
emitter.onComplete();
Sinks.StandaloneFluxSink<Bar> sink = Sinks.unicast();
testDeferredResultSubscriber(sink.asFlux(), Flux.class, forClass(Bar.class), () -> {
sink.next(bar1);
sink.next(bar2);
sink.complete();
}, Arrays.asList(bar1, bar2));
}
@ -189,16 +189,16 @@ public class ReactiveTypeHandlerTests {
public void writeServerSentEvents() throws Exception {
this.servletRequest.addHeader("Accept", "text/event-stream");
EmitterProcessor<String> processor = EmitterProcessor.create();
SseEmitter sseEmitter = (SseEmitter) handleValue(processor, Flux.class, forClass(String.class));
Sinks.StandaloneFluxSink<String> sink = Sinks.unicast();
SseEmitter sseEmitter = (SseEmitter) handleValue(sink.asFlux(), Flux.class, forClass(String.class));
EmitterHandler emitterHandler = new EmitterHandler();
sseEmitter.initialize(emitterHandler);
processor.onNext("foo");
processor.onNext("bar");
processor.onNext("baz");
processor.onComplete();
sink.next("foo");
sink.next("bar");
sink.next("baz");
sink.complete();
assertThat(emitterHandler.getValuesAsText()).isEqualTo("data:foo\n\ndata:bar\n\ndata:baz\n\n");
}
@ -208,16 +208,16 @@ public class ReactiveTypeHandlerTests {
ResolvableType type = ResolvableType.forClassWithGenerics(ServerSentEvent.class, String.class);
EmitterProcessor<ServerSentEvent<?>> processor = EmitterProcessor.create();
SseEmitter sseEmitter = (SseEmitter) handleValue(processor, Flux.class, type);
Sinks.StandaloneFluxSink<ServerSentEvent<?>> sink = Sinks.unicast();
SseEmitter sseEmitter = (SseEmitter) handleValue(sink.asFlux(), Flux.class, type);
EmitterHandler emitterHandler = new EmitterHandler();
sseEmitter.initialize(emitterHandler);
processor.onNext(ServerSentEvent.builder("foo").id("1").build());
processor.onNext(ServerSentEvent.builder("bar").id("2").build());
processor.onNext(ServerSentEvent.builder("baz").id("3").build());
processor.onComplete();
sink.next(ServerSentEvent.builder("foo").id("1").build());
sink.next(ServerSentEvent.builder("bar").id("2").build());
sink.next(ServerSentEvent.builder("baz").id("3").build());
sink.complete();
assertThat(emitterHandler.getValuesAsText()).isEqualTo("id:1\ndata:foo\n\nid:2\ndata:bar\n\nid:3\ndata:baz\n\n");
}
@ -227,8 +227,8 @@ public class ReactiveTypeHandlerTests {
this.servletRequest.addHeader("Accept", "application/stream+json");
EmitterProcessor<Bar> processor = EmitterProcessor.create();
ResponseBodyEmitter emitter = handleValue(processor, Flux.class, forClass(Bar.class));
Sinks.StandaloneFluxSink<Bar> sink = Sinks.unicast();
ResponseBodyEmitter emitter = handleValue(sink.asFlux(), Flux.class, forClass(Bar.class));
EmitterHandler emitterHandler = new EmitterHandler();
emitter.initialize(emitterHandler);
@ -239,9 +239,9 @@ public class ReactiveTypeHandlerTests {
Bar bar1 = new Bar("foo");
Bar bar2 = new Bar("bar");
processor.onNext(bar1);
processor.onNext(bar2);
processor.onComplete();
sink.next(bar1);
sink.next(bar2);
sink.complete();
assertThat(message.getHeaders().getContentType().toString()).isEqualTo("application/stream+json");
assertThat(emitterHandler.getValues()).isEqualTo(Arrays.asList(bar1, "\n", bar2, "\n"));
@ -250,16 +250,16 @@ public class ReactiveTypeHandlerTests {
@Test
public void writeText() throws Exception {
EmitterProcessor<String> processor = EmitterProcessor.create();
ResponseBodyEmitter emitter = handleValue(processor, Flux.class, forClass(String.class));
Sinks.StandaloneFluxSink<String> sink = Sinks.unicast();
ResponseBodyEmitter emitter = handleValue(sink.asFlux(), Flux.class, forClass(String.class));
EmitterHandler emitterHandler = new EmitterHandler();
emitter.initialize(emitterHandler);
processor.onNext("The quick");
processor.onNext(" brown fox jumps over ");
processor.onNext("the lazy dog");
processor.onComplete();
sink.next("The quick");
sink.next(" brown fox jumps over ");
sink.next("the lazy dog");
sink.complete();
assertThat(emitterHandler.getValuesAsText()).isEqualTo("The quick brown fox jumps over the lazy dog");
}

View File

@ -23,8 +23,8 @@ import java.util.function.Consumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
@ -227,16 +227,16 @@ public class ResponseBodyEmitterReturnValueHandlerTests {
this.request.addHeader("Accept", "text/event-stream");
MethodParameter type = on(TestController.class).resolveReturnType(Flux.class, String.class);
EmitterProcessor<String> processor = EmitterProcessor.create();
this.handler.handleReturnValue(processor, type, this.mavContainer, this.webRequest);
Sinks.StandaloneFluxSink<String> sink = Sinks.unicast();
this.handler.handleReturnValue(sink.asFlux(), type, this.mavContainer, this.webRequest);
assertThat(this.request.isAsyncStarted()).isTrue();
assertThat(this.response.getStatus()).isEqualTo(200);
processor.onNext("foo");
processor.onNext("bar");
processor.onNext("baz");
processor.onComplete();
sink.next("foo");
sink.next("bar");
sink.next("baz");
sink.complete();
assertThat(this.response.getContentType()).isEqualTo("text/event-stream");
assertThat(this.response.getContentAsString()).isEqualTo("data:foo\n\ndata:bar\n\ndata:baz\n\n");
@ -248,14 +248,14 @@ public class ResponseBodyEmitterReturnValueHandlerTests {
this.request.addHeader("Accept", "text/event-stream");
MethodParameter type = on(TestController.class).resolveReturnType(Flux.class, String.class);
EmitterProcessor<String> processor = EmitterProcessor.create();
this.handler.handleReturnValue(processor, type, this.mavContainer, this.webRequest);
Sinks.StandaloneFluxSink<String> sink = Sinks.unicast();
this.handler.handleReturnValue(sink.asFlux(), type, this.mavContainer, this.webRequest);
assertThat(this.request.isAsyncStarted()).isTrue();
IllegalStateException ex = new IllegalStateException("wah wah");
processor.onError(ex);
processor.onComplete();
sink.error(ex);
sink.complete();
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(this.webRequest);
assertThat(asyncManager.getConcurrentResult()).isSameAs(ex);
@ -290,8 +290,8 @@ public class ResponseBodyEmitterReturnValueHandlerTests {
@Test
public void responseEntityFlux() throws Exception {
EmitterProcessor<String> processor = EmitterProcessor.create();
ResponseEntity<Flux<String>> entity = ResponseEntity.ok().body(processor);
Sinks.StandaloneFluxSink<String> sink = Sinks.unicast();
ResponseEntity<Flux<String>> entity = ResponseEntity.ok().body(sink.asFlux());
ResolvableType bodyType = forClassWithGenerics(Flux.class, String.class);
MethodParameter type = on(TestController.class).resolveReturnType(ResponseEntity.class, bodyType);
this.handler.handleReturnValue(entity, type, this.mavContainer, this.webRequest);
@ -299,10 +299,10 @@ public class ResponseBodyEmitterReturnValueHandlerTests {
assertThat(this.request.isAsyncStarted()).isTrue();
assertThat(this.response.getStatus()).isEqualTo(200);
processor.onNext("foo");
processor.onNext("bar");
processor.onNext("baz");
processor.onComplete();
sink.next("foo");
sink.next("bar");
sink.next("baz");
sink.complete();
assertThat(this.response.getContentType()).isEqualTo("text/plain");
assertThat(this.response.getContentAsString()).isEqualTo("foobarbaz");
@ -311,8 +311,8 @@ public class ResponseBodyEmitterReturnValueHandlerTests {
@Test // SPR-17076
public void responseEntityFluxWithCustomHeader() throws Exception {
EmitterProcessor<SimpleBean> processor = EmitterProcessor.create();
ResponseEntity<Flux<SimpleBean>> entity = ResponseEntity.ok().header("x-foo", "bar").body(processor);
Sinks.StandaloneFluxSink<SimpleBean> sink = Sinks.unicast();
ResponseEntity<Flux<SimpleBean>> entity = ResponseEntity.ok().header("x-foo", "bar").body(sink.asFlux());
ResolvableType bodyType = forClassWithGenerics(Flux.class, SimpleBean.class);
MethodParameter type = on(TestController.class).resolveReturnType(ResponseEntity.class, bodyType);
this.handler.handleReturnValue(entity, type, this.mavContainer, this.webRequest);