Prepare for Sinks API updates in Reactor

See reactor/reactor-core#2374
All usages of this API are in tests, which are not checking overflow or
concurrent emissions - so a simple replacement with `try***` equivalents
is fine.
This commit is contained in:
Brian Clozel 2020-09-11 16:34:31 +02:00
parent af3dc5d440
commit b2a0978c12
8 changed files with 37 additions and 37 deletions

View File

@ -246,7 +246,7 @@ class RSocketBufferLeakTests {
void checkForLeaks() {
this.rsockets.stream().map(PayloadSavingDecorator::getPayloads)
.forEach(payloadInfoProcessor -> {
payloadInfoProcessor.emitComplete();
payloadInfoProcessor.tryEmitComplete();
payloadInfoProcessor.asFlux()
.doOnNext(this::checkForLeak)
.blockLast();
@ -328,7 +328,7 @@ class RSocketBufferLeakTests {
}
private io.rsocket.Payload addPayload(io.rsocket.Payload payload) {
this.payloads.emitNext(new PayloadLeakInfo(payload));
this.payloads.tryEmitNext(new PayloadLeakInfo(payload));
return payload;
}

View File

@ -231,7 +231,7 @@ public class RSocketClientToServerIntegrationTests {
@MessageMapping("receive")
void receive(String payload) {
this.fireForgetPayloads.emitNext(payload);
this.fireForgetPayloads.tryEmitNext(payload);
}
@MessageMapping("echo")
@ -273,7 +273,7 @@ public class RSocketClientToServerIntegrationTests {
@ConnectMapping("foo-updates")
public void handleMetadata(@Header("foo") String foo) {
this.metadataPushPayloads.emitNext(foo);
this.metadataPushPayloads.tryEmitNext(foo);
}
@MessageExceptionHandler

View File

@ -219,7 +219,7 @@ class RSocketServerToClientIntegrationTests {
@MessageMapping("receive")
void receive(String payload) {
this.fireForgetPayloads.emitNext(payload);
this.fireForgetPayloads.tryEmitNext(payload);
}
@MessageMapping("echo")

View File

@ -371,7 +371,7 @@ public class SimpAnnotationMethodMessageHandlerTests {
this.messageHandler.handleMessage(message);
assertThat(controller.fluxSink).isNotNull();
controller.fluxSink.emitNext("foo");
controller.fluxSink.tryEmitNext("foo");
verify(this.converter, never()).toMessage(any(), any(MessageHeaders.class));
}

View File

@ -149,13 +149,13 @@ class RSocketClientToServerCoroutinesIntegrationTests {
@MessageMapping("receive")
fun receive(payload: String) {
fireForgetPayloads.emitNext(payload)
fireForgetPayloads.tryEmitNext(payload)
}
@MessageMapping("receive-async")
suspend fun receiveAsync(payload: String) {
delay(10)
fireForgetPayloads.emitNext(payload)
fireForgetPayloads.tryEmitNext(payload)
}
@MessageMapping("echo-async")

View File

@ -209,7 +209,7 @@ public class MultipartHttpMessageWriterTests extends AbstractLeakCheckingTests {
public void singleSubscriberWithResource() throws IOException {
Sinks.Many<Resource> sink = Sinks.many().unicast().onBackpressureBuffer();
Resource logo = new ClassPathResource("/org/springframework/http/converter/logo.jpg");
sink.emitNext(logo);
sink.tryEmitNext(logo);
MultipartBodyBuilder bodyBuilder = new MultipartBodyBuilder();
bodyBuilder.asyncPart("logo", sink.asFlux(), Resource.class);

View File

@ -140,9 +140,9 @@ public class ReactiveTypeHandlerTests {
Sinks.Many<Bar> sink = Sinks.many().unicast().onBackpressureBuffer();
testDeferredResultSubscriber(sink.asFlux(), Flux.class, forClass(Bar.class), () -> {
sink.emitNext(bar1);
sink.emitNext(bar2);
sink.emitComplete();
sink.tryEmitNext(bar1);
sink.tryEmitNext(bar2);
sink.tryEmitComplete();
}, Arrays.asList(bar1, bar2));
}
@ -195,10 +195,10 @@ public class ReactiveTypeHandlerTests {
EmitterHandler emitterHandler = new EmitterHandler();
sseEmitter.initialize(emitterHandler);
sink.emitNext("foo");
sink.emitNext("bar");
sink.emitNext("baz");
sink.emitComplete();
sink.tryEmitNext("foo");
sink.tryEmitNext("bar");
sink.tryEmitNext("baz");
sink.tryEmitComplete();
assertThat(emitterHandler.getValuesAsText()).isEqualTo("data:foo\n\ndata:bar\n\ndata:baz\n\n");
}
@ -214,10 +214,10 @@ public class ReactiveTypeHandlerTests {
EmitterHandler emitterHandler = new EmitterHandler();
sseEmitter.initialize(emitterHandler);
sink.emitNext(ServerSentEvent.builder("foo").id("1").build());
sink.emitNext(ServerSentEvent.builder("bar").id("2").build());
sink.emitNext(ServerSentEvent.builder("baz").id("3").build());
sink.emitComplete();
sink.tryEmitNext(ServerSentEvent.builder("foo").id("1").build());
sink.tryEmitNext(ServerSentEvent.builder("bar").id("2").build());
sink.tryEmitNext(ServerSentEvent.builder("baz").id("3").build());
sink.tryEmitComplete();
assertThat(emitterHandler.getValuesAsText()).isEqualTo("id:1\ndata:foo\n\nid:2\ndata:bar\n\nid:3\ndata:baz\n\n");
}
@ -239,9 +239,9 @@ public class ReactiveTypeHandlerTests {
Bar bar1 = new Bar("foo");
Bar bar2 = new Bar("bar");
sink.emitNext(bar1);
sink.emitNext(bar2);
sink.emitComplete();
sink.tryEmitNext(bar1);
sink.tryEmitNext(bar2);
sink.tryEmitComplete();
assertThat(message.getHeaders().getContentType().toString()).isEqualTo("application/x-ndjson");
assertThat(emitterHandler.getValues()).isEqualTo(Arrays.asList(bar1, "\n", bar2, "\n"));
@ -256,10 +256,10 @@ public class ReactiveTypeHandlerTests {
EmitterHandler emitterHandler = new EmitterHandler();
emitter.initialize(emitterHandler);
sink.emitNext("The quick");
sink.emitNext(" brown fox jumps over ");
sink.emitNext("the lazy dog");
sink.emitComplete();
sink.tryEmitNext("The quick");
sink.tryEmitNext(" brown fox jumps over ");
sink.tryEmitNext("the lazy dog");
sink.tryEmitComplete();
assertThat(emitterHandler.getValuesAsText()).isEqualTo("The quick brown fox jumps over the lazy dog");
}

View File

@ -233,10 +233,10 @@ public class ResponseBodyEmitterReturnValueHandlerTests {
assertThat(this.request.isAsyncStarted()).isTrue();
assertThat(this.response.getStatus()).isEqualTo(200);
sink.emitNext("foo");
sink.emitNext("bar");
sink.emitNext("baz");
sink.emitComplete();
sink.tryEmitNext("foo");
sink.tryEmitNext("bar");
sink.tryEmitNext("baz");
sink.tryEmitComplete();
assertThat(this.response.getContentType()).isEqualTo("text/event-stream");
assertThat(this.response.getContentAsString()).isEqualTo("data:foo\n\ndata:bar\n\ndata:baz\n\n");
@ -254,8 +254,8 @@ public class ResponseBodyEmitterReturnValueHandlerTests {
assertThat(this.request.isAsyncStarted()).isTrue();
IllegalStateException ex = new IllegalStateException("wah wah");
sink.emitError(ex);
sink.emitComplete();
sink.tryEmitError(ex);
sink.tryEmitComplete();
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(this.webRequest);
assertThat(asyncManager.getConcurrentResult()).isSameAs(ex);
@ -299,10 +299,10 @@ public class ResponseBodyEmitterReturnValueHandlerTests {
assertThat(this.request.isAsyncStarted()).isTrue();
assertThat(this.response.getStatus()).isEqualTo(200);
sink.emitNext("foo");
sink.emitNext("bar");
sink.emitNext("baz");
sink.emitComplete();
sink.tryEmitNext("foo");
sink.tryEmitNext("bar");
sink.tryEmitNext("baz");
sink.tryEmitComplete();
assertThat(this.response.getContentType()).isEqualTo("text/plain");
assertThat(this.response.getContentAsString()).isEqualTo("foobarbaz");