diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java index 4672127e0d2..d64a827b0be 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,11 +32,10 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxProcessor; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import org.springframework.context.support.StaticApplicationContext; import org.springframework.lang.Nullable; @@ -343,8 +342,8 @@ public class SimpAnnotationMethodMessageHandlerTests { Message message = createMessage("/app1/mono"); this.messageHandler.handleMessage(message); - assertThat(controller.mono).isNotNull(); - controller.mono.onNext("foo"); + assertThat(controller.monoProcessor).isNotNull(); + controller.monoProcessor.onNext("foo"); verify(this.converter).toMessage(this.payloadCaptor.capture(), any(MessageHeaders.class)); assertThat(this.payloadCaptor.getValue()).isEqualTo("foo"); } @@ -358,7 +357,7 @@ public class SimpAnnotationMethodMessageHandlerTests { Message message = createMessage("/app1/mono"); this.messageHandler.handleMessage(message); - controller.mono.onError(new IllegalStateException()); + controller.monoProcessor.onError(new IllegalStateException()); assertThat(controller.exceptionCaught).isTrue(); } @@ -371,14 +370,14 @@ public class SimpAnnotationMethodMessageHandlerTests { Message message = createMessage("/app1/flux"); this.messageHandler.handleMessage(message); - assertThat(controller.flux).isNotNull(); - controller.flux.onNext("foo"); + assertThat(controller.fluxSink).isNotNull(); + controller.fluxSink.next("foo"); verify(this.converter, never()).toMessage(any(), any(MessageHeaders.class)); } @Test - public void placeholder() throws Exception { + public void placeholder() { Message message = createMessage("/pre/myValue"); this.messageHandler.setEmbeddedValueResolver(value -> ("/${myProperty}".equals(value) ? "/myValue" : value)); this.messageHandler.registerHandler(this.testController); @@ -586,22 +585,22 @@ public class SimpAnnotationMethodMessageHandlerTests { @Controller private static class ReactiveController { - private MonoProcessor mono; + private MonoProcessor monoProcessor; - private FluxProcessor flux; + private Sinks.StandaloneFluxSink fluxSink; private boolean exceptionCaught = false; @MessageMapping("mono") public Mono handleMono() { - this.mono = MonoProcessor.create(); - return this.mono; + this.monoProcessor = MonoProcessor.create(); + return this.monoProcessor; } @MessageMapping("flux") public Flux handleFlux() { - this.flux = EmitterProcessor.create(); - return this.flux; + this.fluxSink = Sinks.unicast(); + return this.fluxSink.asFlux(); } @MessageExceptionHandler(IllegalStateException.class) diff --git a/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java b/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java index 86036bf60a2..047ce1996e8 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java @@ -26,6 +26,7 @@ import java.util.Map; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; import reactor.core.publisher.UnicastProcessor; import org.springframework.core.ResolvableType; @@ -207,12 +208,12 @@ public class MultipartHttpMessageWriterTests extends AbstractLeakCheckingTests { @Test // SPR-16402 public void singleSubscriberWithResource() throws IOException { - UnicastProcessor processor = UnicastProcessor.create(); + Sinks.StandaloneFluxSink sink = Sinks.unicast(); Resource logo = new ClassPathResource("/org/springframework/http/converter/logo.jpg"); - Mono.just(logo).subscribe(processor); + sink.next(logo); MultipartBodyBuilder bodyBuilder = new MultipartBodyBuilder(); - bodyBuilder.asyncPart("logo", processor, Resource.class); + bodyBuilder.asyncPart("logo", sink.asFlux(), Resource.class); Mono>> result = Mono.just(bodyBuilder.build()); diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java index f85d5ea39c6..64b3d70e57b 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java @@ -28,7 +28,6 @@ import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; -import reactor.core.publisher.ReplayProcessor; import reactor.util.retry.Retry; import org.springframework.context.annotation.Bean; @@ -81,15 +80,16 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests { private void testEcho() { int count = 100; Flux input = Flux.range(1, count).map(index -> "msg-" + index); - ReplayProcessor output = ReplayProcessor.create(count); - this.client.execute(getUrl("/echo"), session -> session - .send(input.map(session::textMessage)) - .thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText)) - .subscribeWith(output) - .then()) + AtomicReference> actualRef = new AtomicReference<>(); + this.client.execute(getUrl("/echo"), session -> + session.send(input.map(session::textMessage)) + .thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText)) + .collectList() + .doOnNext(actualRef::set) + .then()) .block(TIMEOUT); - assertThat(output.isTerminated()).isTrue(); - assertThat(output.collectList().block()).isEqualTo(input.collectList().block()); + assertThat(actualRef.get()).isNotNull(); + assertThat(actualRef.get()).isEqualTo(input.collectList().block()); } @ParameterizedWebSocketTest diff --git a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java index 6b663551da7..3eb396c9d0b 100644 --- a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java +++ b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java @@ -30,10 +30,10 @@ import io.reactivex.rxjava3.core.Single; import io.reactivex.rxjava3.core.SingleEmitter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import org.springframework.core.MethodParameter; import org.springframework.core.ReactiveAdapterRegistry; @@ -138,11 +138,11 @@ public class ReactiveTypeHandlerTests { Bar bar1 = new Bar("foo"); Bar bar2 = new Bar("bar"); - EmitterProcessor emitter = EmitterProcessor.create(); - testDeferredResultSubscriber(emitter, Flux.class, forClass(Bar.class), () -> { - emitter.onNext(bar1); - emitter.onNext(bar2); - emitter.onComplete(); + Sinks.StandaloneFluxSink sink = Sinks.unicast(); + testDeferredResultSubscriber(sink.asFlux(), Flux.class, forClass(Bar.class), () -> { + sink.next(bar1); + sink.next(bar2); + sink.complete(); }, Arrays.asList(bar1, bar2)); } @@ -189,16 +189,16 @@ public class ReactiveTypeHandlerTests { public void writeServerSentEvents() throws Exception { this.servletRequest.addHeader("Accept", "text/event-stream"); - EmitterProcessor processor = EmitterProcessor.create(); - SseEmitter sseEmitter = (SseEmitter) handleValue(processor, Flux.class, forClass(String.class)); + Sinks.StandaloneFluxSink sink = Sinks.unicast(); + SseEmitter sseEmitter = (SseEmitter) handleValue(sink.asFlux(), Flux.class, forClass(String.class)); EmitterHandler emitterHandler = new EmitterHandler(); sseEmitter.initialize(emitterHandler); - processor.onNext("foo"); - processor.onNext("bar"); - processor.onNext("baz"); - processor.onComplete(); + sink.next("foo"); + sink.next("bar"); + sink.next("baz"); + sink.complete(); assertThat(emitterHandler.getValuesAsText()).isEqualTo("data:foo\n\ndata:bar\n\ndata:baz\n\n"); } @@ -208,16 +208,16 @@ public class ReactiveTypeHandlerTests { ResolvableType type = ResolvableType.forClassWithGenerics(ServerSentEvent.class, String.class); - EmitterProcessor> processor = EmitterProcessor.create(); - SseEmitter sseEmitter = (SseEmitter) handleValue(processor, Flux.class, type); + Sinks.StandaloneFluxSink> sink = Sinks.unicast(); + SseEmitter sseEmitter = (SseEmitter) handleValue(sink.asFlux(), Flux.class, type); EmitterHandler emitterHandler = new EmitterHandler(); sseEmitter.initialize(emitterHandler); - processor.onNext(ServerSentEvent.builder("foo").id("1").build()); - processor.onNext(ServerSentEvent.builder("bar").id("2").build()); - processor.onNext(ServerSentEvent.builder("baz").id("3").build()); - processor.onComplete(); + sink.next(ServerSentEvent.builder("foo").id("1").build()); + sink.next(ServerSentEvent.builder("bar").id("2").build()); + sink.next(ServerSentEvent.builder("baz").id("3").build()); + sink.complete(); assertThat(emitterHandler.getValuesAsText()).isEqualTo("id:1\ndata:foo\n\nid:2\ndata:bar\n\nid:3\ndata:baz\n\n"); } @@ -227,8 +227,8 @@ public class ReactiveTypeHandlerTests { this.servletRequest.addHeader("Accept", "application/stream+json"); - EmitterProcessor processor = EmitterProcessor.create(); - ResponseBodyEmitter emitter = handleValue(processor, Flux.class, forClass(Bar.class)); + Sinks.StandaloneFluxSink sink = Sinks.unicast(); + ResponseBodyEmitter emitter = handleValue(sink.asFlux(), Flux.class, forClass(Bar.class)); EmitterHandler emitterHandler = new EmitterHandler(); emitter.initialize(emitterHandler); @@ -239,9 +239,9 @@ public class ReactiveTypeHandlerTests { Bar bar1 = new Bar("foo"); Bar bar2 = new Bar("bar"); - processor.onNext(bar1); - processor.onNext(bar2); - processor.onComplete(); + sink.next(bar1); + sink.next(bar2); + sink.complete(); assertThat(message.getHeaders().getContentType().toString()).isEqualTo("application/stream+json"); assertThat(emitterHandler.getValues()).isEqualTo(Arrays.asList(bar1, "\n", bar2, "\n")); @@ -250,16 +250,16 @@ public class ReactiveTypeHandlerTests { @Test public void writeText() throws Exception { - EmitterProcessor processor = EmitterProcessor.create(); - ResponseBodyEmitter emitter = handleValue(processor, Flux.class, forClass(String.class)); + Sinks.StandaloneFluxSink sink = Sinks.unicast(); + ResponseBodyEmitter emitter = handleValue(sink.asFlux(), Flux.class, forClass(String.class)); EmitterHandler emitterHandler = new EmitterHandler(); emitter.initialize(emitterHandler); - processor.onNext("The quick"); - processor.onNext(" brown fox jumps over "); - processor.onNext("the lazy dog"); - processor.onComplete(); + sink.next("The quick"); + sink.next(" brown fox jumps over "); + sink.next("the lazy dog"); + sink.complete(); assertThat(emitterHandler.getValuesAsText()).isEqualTo("The quick brown fox jumps over the lazy dog"); } diff --git a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandlerTests.java b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandlerTests.java index 8fff8455e29..53a5214588b 100644 --- a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandlerTests.java +++ b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandlerTests.java @@ -23,8 +23,8 @@ import java.util.function.Consumer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; import org.springframework.core.MethodParameter; import org.springframework.core.ResolvableType; @@ -227,16 +227,16 @@ public class ResponseBodyEmitterReturnValueHandlerTests { this.request.addHeader("Accept", "text/event-stream"); MethodParameter type = on(TestController.class).resolveReturnType(Flux.class, String.class); - EmitterProcessor processor = EmitterProcessor.create(); - this.handler.handleReturnValue(processor, type, this.mavContainer, this.webRequest); + Sinks.StandaloneFluxSink sink = Sinks.unicast(); + this.handler.handleReturnValue(sink.asFlux(), type, this.mavContainer, this.webRequest); assertThat(this.request.isAsyncStarted()).isTrue(); assertThat(this.response.getStatus()).isEqualTo(200); - processor.onNext("foo"); - processor.onNext("bar"); - processor.onNext("baz"); - processor.onComplete(); + sink.next("foo"); + sink.next("bar"); + sink.next("baz"); + sink.complete(); assertThat(this.response.getContentType()).isEqualTo("text/event-stream"); assertThat(this.response.getContentAsString()).isEqualTo("data:foo\n\ndata:bar\n\ndata:baz\n\n"); @@ -248,14 +248,14 @@ public class ResponseBodyEmitterReturnValueHandlerTests { this.request.addHeader("Accept", "text/event-stream"); MethodParameter type = on(TestController.class).resolveReturnType(Flux.class, String.class); - EmitterProcessor processor = EmitterProcessor.create(); - this.handler.handleReturnValue(processor, type, this.mavContainer, this.webRequest); + Sinks.StandaloneFluxSink sink = Sinks.unicast(); + this.handler.handleReturnValue(sink.asFlux(), type, this.mavContainer, this.webRequest); assertThat(this.request.isAsyncStarted()).isTrue(); IllegalStateException ex = new IllegalStateException("wah wah"); - processor.onError(ex); - processor.onComplete(); + sink.error(ex); + sink.complete(); WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(this.webRequest); assertThat(asyncManager.getConcurrentResult()).isSameAs(ex); @@ -290,8 +290,8 @@ public class ResponseBodyEmitterReturnValueHandlerTests { @Test public void responseEntityFlux() throws Exception { - EmitterProcessor processor = EmitterProcessor.create(); - ResponseEntity> entity = ResponseEntity.ok().body(processor); + Sinks.StandaloneFluxSink sink = Sinks.unicast(); + ResponseEntity> entity = ResponseEntity.ok().body(sink.asFlux()); ResolvableType bodyType = forClassWithGenerics(Flux.class, String.class); MethodParameter type = on(TestController.class).resolveReturnType(ResponseEntity.class, bodyType); this.handler.handleReturnValue(entity, type, this.mavContainer, this.webRequest); @@ -299,10 +299,10 @@ public class ResponseBodyEmitterReturnValueHandlerTests { assertThat(this.request.isAsyncStarted()).isTrue(); assertThat(this.response.getStatus()).isEqualTo(200); - processor.onNext("foo"); - processor.onNext("bar"); - processor.onNext("baz"); - processor.onComplete(); + sink.next("foo"); + sink.next("bar"); + sink.next("baz"); + sink.complete(); assertThat(this.response.getContentType()).isEqualTo("text/plain"); assertThat(this.response.getContentAsString()).isEqualTo("foobarbaz"); @@ -311,8 +311,8 @@ public class ResponseBodyEmitterReturnValueHandlerTests { @Test // SPR-17076 public void responseEntityFluxWithCustomHeader() throws Exception { - EmitterProcessor processor = EmitterProcessor.create(); - ResponseEntity> entity = ResponseEntity.ok().header("x-foo", "bar").body(processor); + Sinks.StandaloneFluxSink sink = Sinks.unicast(); + ResponseEntity> entity = ResponseEntity.ok().header("x-foo", "bar").body(sink.asFlux()); ResolvableType bodyType = forClassWithGenerics(Flux.class, SimpleBean.class); MethodParameter type = on(TestController.class).resolveReturnType(ResponseEntity.class, bodyType); this.handler.handleReturnValue(entity, type, this.mavContainer, this.webRequest);