diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java index b2d1bf2603..d290b0d695 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java @@ -148,7 +148,8 @@ class RSocketBufferLeakTests { } @Test // gh-24741 - @Disabled // pending https://github.com/rsocket/rsocket-java/pull/777 + @Disabled + // pending https://github.com/rsocket/rsocket-java/pull/777 void noSuchRouteOnChannelInteraction() { Flux input = Flux.just("foo", "bar", "baz"); Flux result = requester.route("no-such-route").data(input).retrieveFlux(String.class); @@ -245,7 +246,7 @@ class RSocketBufferLeakTests { void checkForLeaks() { this.rsockets.stream().map(PayloadSavingDecorator::getPayloads) .forEach(payloadInfoProcessor -> { - payloadInfoProcessor.complete(); + payloadInfoProcessor.emitComplete(); payloadInfoProcessor.asFlux() .doOnNext(this::checkForLeak) .blockLast(); @@ -290,18 +291,18 @@ class RSocketBufferLeakTests { private final RSocket delegate; - private Sinks.StandaloneFluxSink payloads = Sinks.replayAll(); + private Sinks.Many payloads = Sinks.many().replay().all(); PayloadSavingDecorator(RSocket delegate) { this.delegate = delegate; } - Sinks.StandaloneFluxSink getPayloads() { + Sinks.Many getPayloads() { return this.payloads; } void reset() { - this.payloads = Sinks.replayAll(); + this.payloads = Sinks.many().replay().all(); } @Override @@ -327,7 +328,7 @@ class RSocketBufferLeakTests { } private io.rsocket.Payload addPayload(io.rsocket.Payload payload) { - this.payloads.next(new PayloadLeakInfo(payload)); + this.payloads.emitNext(new PayloadLeakInfo(payload)); return payload; } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java index e6bb2cb2a1..68210b3afd 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java @@ -224,14 +224,14 @@ public class RSocketClientToServerIntegrationTests { @Controller static class ServerController { - final Sinks.StandaloneFluxSink fireForgetPayloads = Sinks.replayAll(); + final Sinks.Many fireForgetPayloads = Sinks.many().replay().all(); - final Sinks.StandaloneFluxSink metadataPushPayloads = Sinks.replayAll(); + final Sinks.Many metadataPushPayloads = Sinks.many().replay().all(); @MessageMapping("receive") void receive(String payload) { - this.fireForgetPayloads.next(payload); + this.fireForgetPayloads.emitNext(payload); } @MessageMapping("echo") @@ -273,7 +273,7 @@ public class RSocketClientToServerIntegrationTests { @ConnectMapping("foo-updates") public void handleMetadata(@Header("foo") String foo) { - this.metadataPushPayloads.next(foo); + this.metadataPushPayloads.emitNext(foo); } @MessageExceptionHandler diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java index 29968cafd0..13931d6d6d 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java @@ -217,11 +217,11 @@ public class RSocketServerToClientIntegrationTests { private static class ClientHandler { - final Sinks.StandaloneFluxSink fireForgetPayloads = Sinks.replayAll(); + final Sinks.Many fireForgetPayloads = Sinks.many().replay().all(); @MessageMapping("receive") void receive(String payload) { - this.fireForgetPayloads.next(payload); + this.fireForgetPayloads.emitNext(payload); } @MessageMapping("echo") diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java index d64a827b0b..08be6ab680 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java @@ -371,7 +371,7 @@ public class SimpAnnotationMethodMessageHandlerTests { this.messageHandler.handleMessage(message); assertThat(controller.fluxSink).isNotNull(); - controller.fluxSink.next("foo"); + controller.fluxSink.emitNext("foo"); verify(this.converter, never()).toMessage(any(), any(MessageHeaders.class)); } @@ -587,7 +587,7 @@ public class SimpAnnotationMethodMessageHandlerTests { private MonoProcessor monoProcessor; - private Sinks.StandaloneFluxSink fluxSink; + private Sinks.Many fluxSink; private boolean exceptionCaught = false; @@ -599,7 +599,7 @@ public class SimpAnnotationMethodMessageHandlerTests { @MessageMapping("flux") public Flux handleFlux() { - this.fluxSink = Sinks.unicast(); + this.fluxSink = Sinks.many().unicast().onBackpressureBuffer(); return this.fluxSink.asFlux(); } diff --git a/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketClientToServerCoroutinesIntegrationTests.kt b/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketClientToServerCoroutinesIntegrationTests.kt index af2dc9f9e5..c899ef0c31 100644 --- a/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketClientToServerCoroutinesIntegrationTests.kt +++ b/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketClientToServerCoroutinesIntegrationTests.kt @@ -145,17 +145,17 @@ class RSocketClientToServerCoroutinesIntegrationTests { @Controller class ServerController { - val fireForgetPayloads = Sinks.replayAll() + val fireForgetPayloads = Sinks.many().replay().all() @MessageMapping("receive") fun receive(payload: String) { - fireForgetPayloads.next(payload) + fireForgetPayloads.emitNext(payload) } @MessageMapping("receive-async") suspend fun receiveAsync(payload: String) { delay(10) - fireForgetPayloads.next(payload) + fireForgetPayloads.emitNext(payload) } @MessageMapping("echo-async") diff --git a/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java b/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java index f91a4462cc..c3318ffa43 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java @@ -207,9 +207,9 @@ public class MultipartHttpMessageWriterTests extends AbstractLeakCheckingTests { @Test // SPR-16402 public void singleSubscriberWithResource() throws IOException { - Sinks.StandaloneFluxSink sink = Sinks.unicast(); + Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); Resource logo = new ClassPathResource("/org/springframework/http/converter/logo.jpg"); - sink.next(logo); + sink.emitNext(logo); MultipartBodyBuilder bodyBuilder = new MultipartBodyBuilder(); bodyBuilder.asyncPart("logo", sink.asFlux(), Resource.class); diff --git a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java index ca4f004d18..aa5ace4614 100644 --- a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java +++ b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java @@ -138,11 +138,11 @@ public class ReactiveTypeHandlerTests { Bar bar1 = new Bar("foo"); Bar bar2 = new Bar("bar"); - Sinks.StandaloneFluxSink sink = Sinks.unicast(); + Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); testDeferredResultSubscriber(sink.asFlux(), Flux.class, forClass(Bar.class), () -> { - sink.next(bar1); - sink.next(bar2); - sink.complete(); + sink.emitNext(bar1); + sink.emitNext(bar2); + sink.emitComplete(); }, Arrays.asList(bar1, bar2)); } @@ -189,16 +189,16 @@ public class ReactiveTypeHandlerTests { public void writeServerSentEvents() throws Exception { this.servletRequest.addHeader("Accept", "text/event-stream"); - Sinks.StandaloneFluxSink sink = Sinks.unicast(); + Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); SseEmitter sseEmitter = (SseEmitter) handleValue(sink.asFlux(), Flux.class, forClass(String.class)); EmitterHandler emitterHandler = new EmitterHandler(); sseEmitter.initialize(emitterHandler); - sink.next("foo"); - sink.next("bar"); - sink.next("baz"); - sink.complete(); + sink.emitNext("foo"); + sink.emitNext("bar"); + sink.emitNext("baz"); + sink.emitComplete(); assertThat(emitterHandler.getValuesAsText()).isEqualTo("data:foo\n\ndata:bar\n\ndata:baz\n\n"); } @@ -208,16 +208,16 @@ public class ReactiveTypeHandlerTests { ResolvableType type = ResolvableType.forClassWithGenerics(ServerSentEvent.class, String.class); - Sinks.StandaloneFluxSink> sink = Sinks.unicast(); + Sinks.Many> sink = Sinks.many().unicast().onBackpressureBuffer(); SseEmitter sseEmitter = (SseEmitter) handleValue(sink.asFlux(), Flux.class, type); EmitterHandler emitterHandler = new EmitterHandler(); sseEmitter.initialize(emitterHandler); - sink.next(ServerSentEvent.builder("foo").id("1").build()); - sink.next(ServerSentEvent.builder("bar").id("2").build()); - sink.next(ServerSentEvent.builder("baz").id("3").build()); - sink.complete(); + sink.emitNext(ServerSentEvent.builder("foo").id("1").build()); + sink.emitNext(ServerSentEvent.builder("bar").id("2").build()); + sink.emitNext(ServerSentEvent.builder("baz").id("3").build()); + sink.emitComplete(); assertThat(emitterHandler.getValuesAsText()).isEqualTo("id:1\ndata:foo\n\nid:2\ndata:bar\n\nid:3\ndata:baz\n\n"); } @@ -227,7 +227,7 @@ public class ReactiveTypeHandlerTests { this.servletRequest.addHeader("Accept", "application/x-ndjson"); - Sinks.StandaloneFluxSink sink = Sinks.unicast(); + Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); ResponseBodyEmitter emitter = handleValue(sink.asFlux(), Flux.class, forClass(Bar.class)); EmitterHandler emitterHandler = new EmitterHandler(); @@ -239,9 +239,9 @@ public class ReactiveTypeHandlerTests { Bar bar1 = new Bar("foo"); Bar bar2 = new Bar("bar"); - sink.next(bar1); - sink.next(bar2); - sink.complete(); + sink.emitNext(bar1); + sink.emitNext(bar2); + sink.emitComplete(); assertThat(message.getHeaders().getContentType().toString()).isEqualTo("application/x-ndjson"); assertThat(emitterHandler.getValues()).isEqualTo(Arrays.asList(bar1, "\n", bar2, "\n")); @@ -250,16 +250,16 @@ public class ReactiveTypeHandlerTests { @Test public void writeText() throws Exception { - Sinks.StandaloneFluxSink sink = Sinks.unicast(); + Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); ResponseBodyEmitter emitter = handleValue(sink.asFlux(), Flux.class, forClass(String.class)); EmitterHandler emitterHandler = new EmitterHandler(); emitter.initialize(emitterHandler); - sink.next("The quick"); - sink.next(" brown fox jumps over "); - sink.next("the lazy dog"); - sink.complete(); + sink.emitNext("The quick"); + sink.emitNext(" brown fox jumps over "); + sink.emitNext("the lazy dog"); + sink.emitComplete(); assertThat(emitterHandler.getValuesAsText()).isEqualTo("The quick brown fox jumps over the lazy dog"); } diff --git a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandlerTests.java b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandlerTests.java index 53a5214588..62fed8fa03 100644 --- a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandlerTests.java +++ b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandlerTests.java @@ -227,16 +227,16 @@ public class ResponseBodyEmitterReturnValueHandlerTests { this.request.addHeader("Accept", "text/event-stream"); MethodParameter type = on(TestController.class).resolveReturnType(Flux.class, String.class); - Sinks.StandaloneFluxSink sink = Sinks.unicast(); + Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); this.handler.handleReturnValue(sink.asFlux(), type, this.mavContainer, this.webRequest); assertThat(this.request.isAsyncStarted()).isTrue(); assertThat(this.response.getStatus()).isEqualTo(200); - sink.next("foo"); - sink.next("bar"); - sink.next("baz"); - sink.complete(); + sink.emitNext("foo"); + sink.emitNext("bar"); + sink.emitNext("baz"); + sink.emitComplete(); assertThat(this.response.getContentType()).isEqualTo("text/event-stream"); assertThat(this.response.getContentAsString()).isEqualTo("data:foo\n\ndata:bar\n\ndata:baz\n\n"); @@ -248,14 +248,14 @@ public class ResponseBodyEmitterReturnValueHandlerTests { this.request.addHeader("Accept", "text/event-stream"); MethodParameter type = on(TestController.class).resolveReturnType(Flux.class, String.class); - Sinks.StandaloneFluxSink sink = Sinks.unicast(); + Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); this.handler.handleReturnValue(sink.asFlux(), type, this.mavContainer, this.webRequest); assertThat(this.request.isAsyncStarted()).isTrue(); IllegalStateException ex = new IllegalStateException("wah wah"); - sink.error(ex); - sink.complete(); + sink.emitError(ex); + sink.emitComplete(); WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(this.webRequest); assertThat(asyncManager.getConcurrentResult()).isSameAs(ex); @@ -290,7 +290,7 @@ public class ResponseBodyEmitterReturnValueHandlerTests { @Test public void responseEntityFlux() throws Exception { - Sinks.StandaloneFluxSink sink = Sinks.unicast(); + Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); ResponseEntity> entity = ResponseEntity.ok().body(sink.asFlux()); ResolvableType bodyType = forClassWithGenerics(Flux.class, String.class); MethodParameter type = on(TestController.class).resolveReturnType(ResponseEntity.class, bodyType); @@ -299,10 +299,10 @@ public class ResponseBodyEmitterReturnValueHandlerTests { assertThat(this.request.isAsyncStarted()).isTrue(); assertThat(this.response.getStatus()).isEqualTo(200); - sink.next("foo"); - sink.next("bar"); - sink.next("baz"); - sink.complete(); + sink.emitNext("foo"); + sink.emitNext("bar"); + sink.emitNext("baz"); + sink.emitComplete(); assertThat(this.response.getContentType()).isEqualTo("text/plain"); assertThat(this.response.getContentAsString()).isEqualTo("foobarbaz"); @@ -311,7 +311,7 @@ public class ResponseBodyEmitterReturnValueHandlerTests { @Test // SPR-17076 public void responseEntityFluxWithCustomHeader() throws Exception { - Sinks.StandaloneFluxSink sink = Sinks.unicast(); + Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); ResponseEntity> entity = ResponseEntity.ok().header("x-foo", "bar").body(sink.asFlux()); ResolvableType bodyType = forClassWithGenerics(Flux.class, SimpleBean.class); MethodParameter type = on(TestController.class).resolveReturnType(ResponseEntity.class, bodyType);